From 263d1a481bd5378e59b804425bc8a9bdc9bebb9a Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Wed, 26 Sep 2018 16:03:45 +0200 Subject: Fix delivery to DataRouter The messages to the DataRouter was not actually sent. Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f Issue-ID: DCAEGEN2-841 Signed-off-by: elinuxhenrik --- .../collectors/datafile/ftp/FileCollector.java | 117 --------------------- .../collectors/datafile/ftp/FtpsClient.java | 16 +-- .../collectors/datafile/model/FileData.java | 2 + .../datafile/service/DmaapConsumerJsonParser.java | 40 +++---- .../datafile/tasks/DmaapConsumerTask.java | 11 +- .../datafile/tasks/DmaapConsumerTaskImpl.java | 33 ++---- .../datafile/tasks/DmaapPublisherTask.java | 10 +- .../datafile/tasks/DmaapPublisherTaskImpl.java | 22 ++-- .../collectors/datafile/tasks/ScheduledTasks.java | 62 +++++------ .../datafile/tasks/XnfCollectorTask.java | 31 ++++++ .../datafile/tasks/XnfCollectorTaskImpl.java | 116 ++++++++++++++++++++ 11 files changed, 230 insertions(+), 230 deletions(-) delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java (limited to 'datafile-app-server/src/main') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java deleted file mode 100644 index 1e2dcc91..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ /dev/null @@ -1,117 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Ericsson. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.io.FilenameUtils; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Mono; - -/** - * @author Henrik Andersson - * - */ -@Component -public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito - // 2.x for testing so it is left for later improvement. - private static final String FTPES = "ftpes"; - private static final String FTPS = "ftps"; - private static final String SFTP = "sftp"; - - private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); - - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; - - @Autowired - protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) { - this.ftpsClient = ftpsCleint; - this.sftpClient = sftpClient; - } - - public Mono> getFilesFromSender(List listOfFileData) { - List consumerModels = new ArrayList(); - for (FileData fileData : listOfFileData) { - String localFile = collectFile(fileData); - - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - consumerModels.add(consumerDmaapModel); - } - } - return Mono.just(consumerModels); - } - - private String collectFile(FileData fileData) { - String location = fileData.location(); - URI uri = URI.create(location); - String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); - FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(uri.getHost()) - .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "") - .port(uri.getPort()).build(); - String remoteFile = uri.getPath(); - String localFile = "target/" + FilenameUtils.getName(remoteFile); - String scheme = uri.getScheme(); - - boolean fileDownloaded = false; - if (FTPES.equals(scheme) || FTPS.equals(scheme)) { - fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile); - } else if (SFTP.equals(scheme)) { - fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile); - } else { - - logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and " - + SFTP + ". " + fileData); - localFile = null; - } - if (!fileDownloaded) { - localFile = null; - } - return localFile; - } - - private String[] getUserNameAndPasswordIfGiven(String userInfoString) { - String[] userInfo = null; - if (userInfoString != null && !userInfoString.isEmpty()) { - userInfo = userInfoString.split(":"); - } - return userInfo; - } - - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { - String compression = fileData.compression(); - String fileFormatType = fileData.fileFormatType(); - String fileFormatVersion = fileData.fileFormatVersion(); - - return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression) - .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build(); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index a3f75826..89f5bbf2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -17,7 +17,6 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) { + logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData, + remoteFile, localFile); boolean result = true; try { FTPSClient ftps = new FTPSClient("TLS"); @@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit closeDownConnection(ftps); } } catch (IOException ex) { - logger.error("Unable to collect file from xNF. " + fileServerData, ex); + logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex); result = false; } + logger.trace("collectFile left with result: {}", result); return result; } @@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit if (!ftps.login(fileServerData.userId(), fileServerData.password())) { ftps.logout(); - logger.error("Unable to log in to xNF. " + fileServerData); + logger.error("Unable to log in to xNF. {}", fileServerData); success = false; } @@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit int reply = ftps.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftps.disconnect(); - logger.error("Unable to connect in to xNF. " + fileServerData); + logger.error("Unable to connect in to xNF. {}", fileServerData); success = false; } ftps.enterLocalPassiveMode(); } } catch (Exception ex) { - logger.error("Unable to connect to xNF." + fileServerData, ex); + logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex); success = false; } @@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit } private void getFile(String remoteFile, String localFile, FTPSClient ftps) - throws IOException, FileNotFoundException { + throws IOException { OutputStream output; File outfile = new File(localFile); outfile.createNewFile(); @@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit ftps.retrieveFile(remoteFile, output); output.close(); - logger.debug("File " + outfile.getName() + " Download Successfull from xNF"); + logger.debug("File {} Download Successfull from xNF", outfile.getName()); } private void closeDownConnection(FTPSClient ftps) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 48c4896f..221f5cb6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -33,6 +33,8 @@ public interface FileData { String changeType(); + String name(); + String location(); String compression(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e4afd3ae..7226dfa8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -28,12 +28,13 @@ import java.util.stream.StreamSupport; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser { * @param rawMessage - results from DMaaP * @return reactive Mono with an array of FileData */ - public Mono> getJsonObject(Mono rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux getJsonObject(Mono rawMessage) { + return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); } private Mono getJsonParserMessage(String message) { @@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser { : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Mono> createJsonConsumerModel(JsonElement jsonElement) { + private Flux createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) : getFileDataFromJsonArray(jsonElement); } - private Mono> getFileDataFromJsonArray(JsonElement jsonElement) { + private Flux getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } @@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser { return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); } - private Mono> create(Mono jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + private Flux create(Mono jsonObject) { + return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) + ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) : transform(monoJsonP)); } - private Mono> transform(JsonObject jsonObject) { + private Flux transform(JsonObject jsonObject) { if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); @@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser { if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) && arrayOfNamedHashMap != null) { - Mono> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); - return res; + return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } - private Mono> getAllFileDataFromJson(String changeIdentifier, String changeType, + private Flux getAllFileDataFromJson(String changeIdentifier, String changeType, JsonArray arrayOfAdditionalFields) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { @@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); } } } - return Mono.just(res); + return Flux.fromIterable(res); } private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) { @@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser { if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { - fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) + fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) .location(location).compression(compression).fileFormatType(fileFormatType) .fileFormatVersion(fileFormatVersion).build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 0c76fc17..32fdbdc7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -18,17 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -37,7 +34,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono> consume(Mono message) throws DmaapNotFoundException; + abstract Flux consume(Mono message) throws DmaapNotFoundException; abstract DmaapConsumerReactiveHttpClient resolveClient(); @@ -45,7 +42,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono> execute(String object) throws DatafileTaskException; + protected abstract Flux execute(String object); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index 839e03c9..7ec474ca 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -16,21 +16,18 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private Config datafileAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - FileCollector fileCollector; @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) { + public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - this.fileCollector = fileCollector; } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.fileCollector = fileCollector; } @Override - Mono> consume(Mono message) { - logger.trace("Method called with arg {}", message); + Flux consume(Mono message) { + logger.trace("consume called with arg {}", message.toString()); return dmaapConsumerJsonParser.getJsonObject(message); } - private Mono> getFilesFromSender(List listOfFileData) { - Mono> filesFromSender = fileCollector.getFilesFromSender(listOfFileData); - return filesFromSender; - } - @Override - protected Mono> execute(String object) { + protected Flux execute(String object) { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", object); - Mono> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); - return consumerResult.flatMap(this::getFilesFromSender); + logger.trace("execute called with arg {}", object); + return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java index 716b52c1..0b81df5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -16,16 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author Przemysław Wąsala on 3/23/18 @@ -33,14 +30,13 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono publish(Mono> consumerDmaapModel) throws DatafileTaskException; + abstract Flux publish(ConsumerDmaapModel consumerDmaapModel); abstract DmaapProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono execute(Mono> consumerDmaapModel) - throws DatafileTaskException; + protected abstract Flux execute(ConsumerDmaapModel consumerDmaapModel); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 5779051c..b4ee3a9d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -16,13 +16,9 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.slf4j.Logger; @@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author Przemysław Wąsala on 4/13/18 @@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono publish(Mono> consumerDmaapModels) { - logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); + public Flux publish(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel); + return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); } @Override - public Mono execute(Mono> consumerDmaapModels) - throws DatafileTaskException { - if (consumerDmaapModels == null) { - throw new DmaapNotFoundException("Invoked null object to DMaaP task"); - } + public Flux execute(ConsumerDmaapModel consumerDmaapModel) { dmaapProducerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", consumerDmaapModels); - return publish(consumerDmaapModels); + logger.trace("Method called with arg {}", consumerDmaapModel); + return publish(consumerDmaapModel); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 14085bb8..c263c95c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -2,35 +2,29 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; -import java.util.concurrent.Callable; - -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Flux; /** * @author Przemysław Wąsala on 3/23/18 @@ -42,17 +36,21 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final DmaapConsumerTask dmaapConsumerTask; + private final XnfCollectorTask xnfCollectorTask; private final DmaapPublisherTask dmaapProducerTask; /** * Constructor for task registration in Datafile Workflow. * * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - second task + * @param xnfCollectorTask - second task + * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) { + public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, + DmaapPublisherTask dmaapPublisherTask) { this.dmaapConsumerTask = dmaapConsumerTask; + this.xnfCollectorTask = xnfCollectorTask; this.dmaapProducerTask = dmaapPublisherTask; } @@ -62,12 +60,10 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); - Mono dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); - - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + consumeFromDmaapMessage() + .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) + .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration) + .subscribe(this::onSuccess, this::onError, this::onComplete); } private void onComplete() { @@ -84,18 +80,16 @@ public class ScheduledTasks { } } - private Callable>> consumeFromDmaapMessage() { - return () -> { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); - }; + private Flux consumeFromDmaapMessage() { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); } - private Mono publishToDmaapConfiguration(Mono> monoModel) { - try { - return dmaapProducerTask.execute(monoModel); - } catch (DatafileTaskException e) { - return Mono.error(e); - } + private Flux collectFilesFromXnf(FileData fileData) { + return xnfCollectorTask.execute(fileData); + } + + private Flux publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { + return dmaapProducerTask.execute(monoModel); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java new file mode 100644 index 00000000..66d59ae8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; + +import reactor.core.publisher.Flux; + +/** + * @author Henrik Andersson + */ +public interface XnfCollectorTask { + Flux execute(FileData fileData); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java new file mode 100644 index 00000000..a29fb092 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java @@ -0,0 +1,116 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import java.io.File; +import java.net.URI; + +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import reactor.core.publisher.Flux; + +/** + * @author Henrik Andersson + */ +@Component +public class XnfCollectorTaskImpl implements XnfCollectorTask { + + private static final String FTPES = "ftpes"; + private static final String FTPS = "ftps"; + private static final String SFTP = "sftp"; + + private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); + + private final FtpsClient ftpsClient; + private final SftpClient sftpClient; + + @Autowired + protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) { + this.ftpsClient = ftpsCleint; + this.sftpClient = sftpClient; + } + + @Override + public Flux execute(FileData fileData) { + logger.trace("Entering execute with {}", fileData); + String localFile = collectFile(fileData); + + if (localFile != null) { + ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); + logger.trace("Exiting execute with {}", consumerDmaapModel); + return Flux.just(consumerDmaapModel); + } + logger.trace("Exiting execute with empty"); + return Flux.empty(); + } + + private String collectFile(FileData fileData) { + String location = fileData.location(); + URI uri = URI.create(location); + String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo()); + FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(uri.getHost()) + .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "") + .port(uri.getPort()).build(); + String remoteFile = uri.getPath(); + String localFile = "target" + File.separator + fileData.name(); + String scheme = uri.getScheme(); + + boolean fileDownloaded = false; + if (FTPES.equals(scheme) || FTPS.equals(scheme)) { + fileDownloaded = ftpsClient.collectFile(fileServerData, remoteFile, localFile); + } else if (SFTP.equals(scheme)) { + fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile); + } else { + + logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, + FTPES, FTPS, SFTP, fileData); + localFile = null; + } + if (!fileDownloaded) { + localFile = null; + } + return localFile; + } + + private String[] getUserNameAndPasswordIfGiven(String userInfoString) { + String[] userInfo = null; + if (userInfoString != null && !userInfoString.isEmpty()) { + userInfo = userInfoString.split(":"); + } + return userInfo; + } + + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { + String name = fileData.name(); + String compression = fileData.compression(); + String fileFormatType = fileData.fileFormatType(); + String fileFormatVersion = fileData.fileFormatVersion(); + + return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression) + .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build(); + } +} -- cgit 1.2.3-korg