diff options
21 files changed, 448 insertions, 491 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java index a3f75826..89f5bbf2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java @@ -17,7 +17,6 @@ package org.onap.dcaegen2.collectors.datafile.ftp; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class); public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) { + logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData, + remoteFile, localFile); boolean result = true; try { FTPSClient ftps = new FTPSClient("TLS"); @@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit closeDownConnection(ftps); } } catch (IOException ex) { - logger.error("Unable to collect file from xNF. " + fileServerData, ex); + logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex); result = false; } + logger.trace("collectFile left with result: {}", result); return result; } @@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit if (!ftps.login(fileServerData.userId(), fileServerData.password())) { ftps.logout(); - logger.error("Unable to log in to xNF. " + fileServerData); + logger.error("Unable to log in to xNF. {}", fileServerData); success = false; } @@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit int reply = ftps.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftps.disconnect(); - logger.error("Unable to connect in to xNF. " + fileServerData); + logger.error("Unable to connect in to xNF. {}", fileServerData); success = false; } ftps.enterLocalPassiveMode(); } } catch (Exception ex) { - logger.error("Unable to connect to xNF." + fileServerData, ex); + logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex); success = false; } @@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit } private void getFile(String remoteFile, String localFile, FTPSClient ftps) - throws IOException, FileNotFoundException { + throws IOException { OutputStream output; File outfile = new File(localFile); outfile.createNewFile(); @@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit ftps.retrieveFile(remoteFile, output); output.close(); - logger.debug("File " + outfile.getName() + " Download Successfull from xNF"); + logger.debug("File {} Download Successfull from xNF", outfile.getName()); } private void closeDownConnection(FTPSClient ftps) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 48c4896f..221f5cb6 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -33,6 +33,8 @@ public interface FileData { String changeType(); + String name(); + String location(); String compression(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index e4afd3ae..7226dfa8 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -28,12 +28,13 @@ import java.util.stream.StreamSupport; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser { * @param rawMessage - results from DMaaP * @return reactive Mono with an array of FileData */ - public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) { - return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel); + public Flux<FileData> getJsonObject(Mono<String> rawMessage) { + return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { @@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser { : Mono.fromSupplier(() -> new JsonParser().parse(message)); } - private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) { + private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) : getFileDataFromJsonArray(jsonElement); } - private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) { + private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } @@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser { return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject()); } - private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + private Flux<FileData> create(Mono<JsonObject> jsonObject) { + return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP) + ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) : transform(monoJsonP)); } - private Mono<List<FileData>> transform(JsonObject jsonObject) { + private Flux<FileData> transform(JsonObject jsonObject) { if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) { JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER); @@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser { if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) && arrayOfNamedHashMap != null) { - Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); - return res; + return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } - return Mono.error( + return Flux.error( new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } - private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType, + private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType, JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { @@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo); } } } - return Mono.just(res); + return Flux.fromIterable(res); } private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) { @@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser { if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { - fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) + fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType) .location(location).compression(compression).fileFormatType(fileFormatType) .fileFormatVersion(fileFormatVersion).build(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 0c76fc17..32fdbdc7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -18,17 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -37,7 +34,7 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException; + abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException; abstract DmaapConsumerReactiveHttpClient resolveClient(); @@ -45,7 +42,7 @@ abstract class DmaapConsumerTask { protected abstract DmaapConsumerConfiguration resolveConfiguration(); - protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException; + protected abstract Flux<FileData> execute(String object); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index 839e03c9..7ec474ca 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -16,21 +16,18 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private Config datafileAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; - FileCollector fileCollector; @Autowired - public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) { + public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser(); - this.fileCollector = fileCollector; } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.fileCollector = fileCollector; } @Override - Mono<List<FileData>> consume(Mono<String> message) { - logger.trace("Method called with arg {}", message); + Flux<FileData> consume(Mono<String> message) { + logger.trace("consume called with arg {}", message.toString()); return dmaapConsumerJsonParser.getJsonObject(message); } - private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { - Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData); - return filesFromSender; - } - @Override - protected Mono<List<ConsumerDmaapModel>> execute(String object) { + protected Flux<FileData> execute(String object) { dmaaPConsumerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", object); - Mono<List<FileData>> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); - return consumerResult.flatMap(this::getFilesFromSender); + logger.trace("execute called with arg {}", object); + return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java index 716b52c1..0b81df5b 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java @@ -16,16 +16,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -33,14 +30,13 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException; + abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel); abstract DmaapProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) - throws DatafileTaskException; + protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel); WebClient buildWebClient() { return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 5779051c..b4ee3a9d 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -16,13 +16,9 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; - import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.Config; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.slf4j.Logger; @@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) { - logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); - return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); + public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel); + return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel); } @Override - public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { - if (consumerDmaapModels == null) { - throw new DmaapNotFoundException("Invoked null object to DMaaP task"); - } + public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) { dmaapProducerReactiveHttpClient = resolveClient(); - logger.trace("Method called with arg {}", consumerDmaapModels); - return publish(consumerDmaapModels); + logger.trace("Method called with arg {}", consumerDmaapModel); + return publish(consumerDmaapModel); } @Override diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 14085bb8..c263c95c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -2,35 +2,29 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.tasks; -import java.util.List; -import java.util.concurrent.Callable; - -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -42,17 +36,21 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final DmaapConsumerTask dmaapConsumerTask; + private final XnfCollectorTask xnfCollectorTask; private final DmaapPublisherTask dmaapProducerTask; /** * Constructor for task registration in Datafile Workflow. * * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - second task + * @param xnfCollectorTask - second task + * @param dmaapPublisherTask - third task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) { + public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask, + DmaapPublisherTask dmaapPublisherTask) { this.dmaapConsumerTask = dmaapConsumerTask; + this.xnfCollectorTask = xnfCollectorTask; this.dmaapProducerTask = dmaapPublisherTask; } @@ -62,12 +60,10 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); - - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + consumeFromDmaapMessage() + .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP")) + .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration) + .subscribe(this::onSuccess, this::onError, this::onComplete); } private void onComplete() { @@ -84,18 +80,16 @@ public class ScheduledTasks { } } - private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() { - return () -> { - dmaapConsumerTask.initConfigs(); - return dmaapConsumerTask.execute(""); - }; + private Flux<FileData> consumeFromDmaapMessage() { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); } - private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) { - try { - return dmaapProducerTask.execute(monoModel); - } catch (DatafileTaskException e) { - return Mono.error(e); - } + private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) { + return xnfCollectorTask.execute(fileData); + } + + private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) { + return dmaapProducerTask.execute(monoModel); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java new file mode 100644 index 00000000..66d59ae8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; + +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + */ +public interface XnfCollectorTask { + Flux<ConsumerDmaapModel> execute(FileData fileData); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java index 1e2dcc91..a29fb092 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java @@ -1,73 +1,71 @@ -/*- +/* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Ericsson. All rights reserved. + * Copyright (C) 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.ftp; +package org.onap.dcaegen2.collectors.datafile.tasks; +import java.io.File; import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.io.FilenameUtils; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - * */ @Component -public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito - // 2.x for testing so it is left for later improvement. +public class XnfCollectorTaskImpl implements XnfCollectorTask { + private static final String FTPES = "ftpes"; private static final String FTPS = "ftps"; private static final String SFTP = "sftp"; - private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); + private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class); private final FtpsClient ftpsClient; private final SftpClient sftpClient; @Autowired - protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) { + protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) { this.ftpsClient = ftpsCleint; this.sftpClient = sftpClient; } - public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) { - List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>(); - for (FileData fileData : listOfFileData) { - String localFile = collectFile(fileData); + @Override + public Flux<ConsumerDmaapModel> execute(FileData fileData) { + logger.trace("Entering execute with {}", fileData); + String localFile = collectFile(fileData); - if (localFile != null) { - ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); - consumerModels.add(consumerDmaapModel); - } + if (localFile != null) { + ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile); + logger.trace("Exiting execute with {}", consumerDmaapModel); + return Flux.just(consumerDmaapModel); } - return Mono.just(consumerModels); + logger.trace("Exiting execute with empty"); + return Flux.empty(); } private String collectFile(FileData fileData) { @@ -78,7 +76,7 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "") .port(uri.getPort()).build(); String remoteFile = uri.getPath(); - String localFile = "target/" + FilenameUtils.getName(remoteFile); + String localFile = "target" + File.separator + fileData.name(); String scheme = uri.getScheme(); boolean fileDownloaded = false; @@ -88,8 +86,8 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile); } else { - logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and " - + SFTP + ". " + fileData); + logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme, + FTPES, FTPS, SFTP, fileData); localFile = null; } if (!fileDownloaded) { @@ -107,11 +105,12 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow } private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) { + String name = fileData.name(); String compression = fileData.compression(); String fileFormatType = fileData.fileFormatType(); String fileFormatVersion = fileData.fileFormatVersion(); - return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression) + return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression) .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java deleted file mode 100644 index 2f61ac97..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.ftp; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.model.FileData; -import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; - -import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -public class FileCollectorTest { - - private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES"; - private static final String FILE_READY_CHANGE_TYPE = "FileReady"; - private static final String FTPES_SCHEME = "ftpes://"; - private static final String SFTP_SCHEME = "sftp://"; - private static final String SERVER_ADDRESS = "192.168.0.101"; - private static final int PORT_22 = 22; - private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; - private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME; - private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; - private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; - private static final String GZIP_COMPRESSION = "gzip"; - private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; - private static final String FILE_FORMAT_VERSION = "V10"; - - private FtpsClient ftpsClientMock = mock(FtpsClient.class); - - private SftpClient sftpClientMock = mock(SftpClient.class); - - private FileCollector fileCollectorUndetTest = new FileCollector(ftpsClientMock, sftpClientMock); - - @Test - public void whenSingleFtpesFile_returnCorrectResponse() { - List<FileData> listOfFileData = new ArrayList<FileData>(); - listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); - - FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22) - .userId("").password("").build(); - when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true); - - Mono<List<ConsumerDmaapModel>> consumerModelsMono = - fileCollectorUndetTest.getFilesFromSender(listOfFileData); - - List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block(); - assertEquals(1, consumerModels.size()); - ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0); - assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression()); - assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType()); - assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion()); - assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation()); - FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS) - .userId("").password("").port(PORT_22).build(); - verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verifyNoMoreInteractions(ftpsClientMock); - } - - @Test - public void whenSingleSftpFile_returnCorrectResponse() { - List<FileData> listOfFileData = new ArrayList<FileData>(); - listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); - - FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22) - .userId("").password("").build(); - when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true); - - Mono<List<ConsumerDmaapModel>> consumerModelsMono = - fileCollectorUndetTest.getFilesFromSender(listOfFileData); - - List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block(); - assertEquals(1, consumerModels.size()); - ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0); - assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression()); - assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType()); - assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion()); - assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation()); - FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS) - .userId("").password("").port(PORT_22).build(); - verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - verifyNoMoreInteractions(ftpsClientMock); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java index 8c36a51f..b5457b82 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java @@ -16,11 +16,11 @@ package org.onap.dcaegen2.collectors.datafile.service; -import static org.junit.Assert.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.spy; -import java.util.List; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + import java.util.Optional; import org.junit.jupiter.api.Test; @@ -31,9 +31,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -42,19 +39,27 @@ import reactor.test.StepVerifier; * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ class DmaapConsumerJsonParserTest { + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; + private static final String GZIP_COMPRESSION = "gzip"; + private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; + private static final String FILE_FORMAT_VERSION = "V10"; + private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; + private static final String CHANGE_TYPE = "FileReady"; + private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField).build(); - FileData expectedFileData = ImmutableFileData.builder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") - .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); + FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -63,19 +68,18 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(expectedFileData, fileDataResult.get(0)); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).verifyComplete(); } @Test void whenPassingCorrectJsonWihoutName_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build(); + AdditionalField additionalField = + new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -84,18 +88,18 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(0, fileDataResult.size()); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNextCount(0).verifyComplete(); } @Test void whenPassingCorrectJsonWihoutLocation_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build(); + AdditionalField additionalField = + new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -104,19 +108,17 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(0, fileDataResult.size()); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNextCount(0).verifyComplete(); } @Test void whenPassingCorrectJsonWihoutCompression_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") - .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -125,19 +127,17 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(0, fileDataResult.size()); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNextCount(0).verifyComplete(); } @Test void whenPassingCorrectJsonWihoutFileFormatType_noFileData() { - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") - .fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) + .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalField).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -146,24 +146,24 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(0, fileDataResult.size()); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNextCount(0).verifyComplete(); } @Test void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() { - AdditionalField additionalFaultyField = - new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec").build(); - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") - .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") - .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalFaultyField) - .addAdditionalField(additionalField).build(); + AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME) + .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION) + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) + .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build(); + + FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER) + .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); String messageString = message.toString(); String parsedString = message.getParsed(); @@ -172,10 +172,8 @@ class DmaapConsumerJsonParserTest { Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser) .getJsonObjectFromAnArray(jsonElement); - List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block(); - - assertNotNull(fileDataResult); - assertEquals(1, fileDataResult.size()); + StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription() + .expectNext(expectedFileData).verifyComplete(); } @Test diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java index e6818453..43502b49 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; @@ -35,16 +34,16 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfig import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -53,7 +52,7 @@ import reactor.test.StepVerifier; * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ class DmaapConsumerTaskImplTest { - private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES"; + private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES"; private static final String FILE_READY_CHANGE_TYPE = "FileReady"; private static final String FTPES_SCHEME = "ftpes://"; private static final String SFTP_SCHEME = "sftp://"; @@ -75,13 +74,11 @@ class DmaapConsumerTaskImplTest { private DmaapConsumerTaskImpl dmaapConsumerTask; private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; - private static FileCollector fileCollectorMock; - private static String ftpesMessage; - private static List<FileData> ftpesFileDataAfterConsume = new ArrayList<FileData>(); + private static FileData ftpesFileData; private static String sftpMessage; - private static List<FileData> sftpFileDataAfterConsume = new ArrayList<FileData>(); + private static FileData sftpFileData; @BeforeAll public static void setUp() { @@ -95,42 +92,38 @@ class DmaapConsumerTaskImplTest { AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder().location(FTPES_LOCATION).compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) + JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0") .addAdditionalField(ftpesAdditionalField).build(); ftpesMessage = ftpesJsonMessage.toString(); - FileData ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - ftpesFileDataAfterConsume.add(ftpesFileData); + ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION).build(); AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder().location(SFTP_LOCATION).compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) + JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) .changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0") .addAdditionalField(sftpAdditionalField).build(); sftpMessage = sftpJsonMessage.toString(); - FileData sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - sftpFileDataAfterConsume.add(sftpFileData); + sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION).build(); - ImmutableConsumerDmaapModel consumerDmaapModel = - ImmutableConsumerDmaapModel.builder().location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME) + .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); listOfConsumerDmaapModel.add(consumerDmaapModel); - - fileCollectorMock = mock(FileCollector.class); } @Test public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { - // given - prepareMocksForDmaapConsumer("", new ArrayList<FileData>()); + prepareMocksForDmaapConsumer("", null); - // then StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription() .expectError(DmaapEmptyResponseException.class).verify(); @@ -139,51 +132,39 @@ class DmaapConsumerTaskImplTest { @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { - // given - prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileDataAfterConsume); - // when - final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block(); - // then + prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData); + + StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete(); + verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); - verify(fileCollectorMock, times(1)).getFilesFromSender(ftpesFileDataAfterConsume); - verifyNoMoreInteractions(fileCollectorMock); - Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse); - } @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { - // given - prepareMocksForDmaapConsumer(sftpMessage, sftpFileDataAfterConsume); - // when - final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block(); - // then + prepareMocksForDmaapConsumer(sftpMessage, sftpFileData); + + StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete(); + verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse(); verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); - verify(fileCollectorMock, times(1)).getFilesFromSender(sftpFileDataAfterConsume); - verifyNoMoreInteractions(fileCollectorMock); - Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse); - } - private void prepareMocksForDmaapConsumer(String message, List<FileData> fileDataAfterConsume) { + private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) { Mono<String> messageAsMono = Mono.just(message); DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class); dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class); when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { - when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Mono.just(fileDataAfterConsume)); + when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume)); } else { when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)) - .thenReturn(Mono.error(new DmaapEmptyResponseException())); + .thenReturn(Flux.error(new DmaapEmptyResponseException())); } - when(fileCollectorMock.getFilesFromSender(fileDataAfterConsume)) - .thenReturn(Mono.just(listOfConsumerDmaapModel)); - dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, - dmaapConsumerJsonParserMock, fileCollectorMock)); + dmaapConsumerTask = + spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock)); when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient(); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java index 4f7787e9..c124e982 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java @@ -2,17 +2,15 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ @@ -27,32 +25,27 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.util.ArrayList; -import java.util.List; - -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ class DmaapPublisherTaskImplTest { + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static ConsumerDmaapModel consumerDmaapModel; - private static List<ConsumerDmaapModel> listOfConsumerDmaapModel; private static DmaapPublisherTaskImpl dmaapPublisherTask; private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; private static AppConfig appConfig; @@ -64,48 +57,17 @@ class DmaapPublisherTaskImplTest { new ImmutableDmaapPublisherConfiguration.Builder().dmaapContentType("application/json") .dmaapHostName("54.45.33.2").dmaapPortNumber(1234).dmaapProtocol("https").dmaapUserName("DFC") .dmaapUserPassword("DFC").dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT").build(); - consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location("target/A20161224.1030-1045.bin.gz") + consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME).location("target/" + PM_FILE_NAME) .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); - listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>(); - listOfConsumerDmaapModel.add(consumerDmaapModel); appConfig = mock(AppConfig.class); } @Test - public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { - // given - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); - dmaapPublisherTask = new DmaapPublisherTaskImpl(appConfig); - - // when - Executable executableFunction = () -> dmaapPublisherTask.execute(null); - - // then - Assertions.assertThrows(DatafileTaskException.class, executableFunction, - "The specified parameter is incorrect"); - } - - @Test - public void whenPassedObjectFits_ReturnsCorrectStatus() throws DatafileTaskException { - // given + public void whenPassedObjectFits_ReturnsCorrectStatus() { prepareMocksForTests(HttpStatus.OK.value()); - // when - dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel)); - - // then - verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); - verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); - } - - @Test - public void whenPassedObjectFits_ReturnsNoContent() throws DatafileTaskException { - // given - prepareMocksForTests(HttpStatus.NO_CONTENT.value()); - - dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel)); + StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete(); - // then verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any()); verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); } @@ -113,7 +75,7 @@ class DmaapPublisherTaskImplTest { private void prepareMocksForTests(Integer httpResponseCode) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())) - .thenReturn(Mono.just(httpResponseCode.toString())); + .thenReturn(Flux.just(httpResponseCode.toString())); when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java new file mode 100644 index 00000000..528a481c --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -0,0 +1,131 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.File; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; +import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData; +import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; + +import reactor.test.StepVerifier; + +/** + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * + */ +public class XnfCollectorTaskImplTest { + + private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES"; + private static final String FILE_READY_CHANGE_TYPE = "FileReady"; + private static final String FTPES_SCHEME = "ftpes://"; + private static final String SFTP_SCHEME = "sftp://"; + private static final String SERVER_ADDRESS = "192.168.0.101"; + private static final int PORT_22 = 22; + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; + private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME; + private static final String USER = "usr"; + private static final String PWD = "pwd"; + private static final String FTPES_LOCATION = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String GZIP_COMPRESSION = "gzip"; + private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; + private static final String FILE_FORMAT_VERSION = "V10"; + + private FtpsClient ftpsClientMock = mock(FtpsClient.class); + + private SftpClient sftpClientMock = mock(SftpClient.class); + + private XnfCollectorTask collectorUndetTest = new XnfCollectorTaskImpl(ftpsClientMock, sftpClientMock); + + @Test + public void whenSingleFtpesFile_returnCorrectResponse() { + FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) + .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION).build(); + + FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER) + .password(PWD).port(PORT_22).build(); + when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) + .thenReturn(Boolean.TRUE); + + ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME) + .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + + StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) + .verifyComplete(); + + verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verifyNoMoreInteractions(ftpsClientMock); + } + + @Test + public void whenSingleSftpFile_returnCorrectResponse() { + FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) + .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION) + .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION).build(); + + FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("") + .password("").port(PORT_22).build(); + when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) + .thenReturn(Boolean.TRUE); + + ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME) + .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + + StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel) + .verifyComplete(); + + verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); + verifyNoMoreInteractions(ftpsClientMock); + } + + @Test + public void whenWrongScheme_returnEmpty() { + FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) + .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location("http://host.com/file.zip") + .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) + .fileFormatVersion(FILE_FORMAT_VERSION).build(); + + FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("") + .password("").port(PORT_22).build(); + when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)) + .thenReturn(Boolean.TRUE); + + StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete(); + + verifyNoMoreInteractions(ftpsClientMock); + } +} diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java index 94e7ccd7..d9c146f1 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java @@ -16,11 +16,11 @@ package org.onap.dcaegen2.collectors.datafile.model; +import com.google.gson.annotations.SerializedName; + import org.immutables.gson.Gson; import org.immutables.value.Value; -import com.google.gson.annotations.SerializedName; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -30,6 +30,9 @@ import com.google.gson.annotations.SerializedName; @Gson.TypeAdapters public interface ConsumerDmaapModel { + @SerializedName("name") + String getName(); + @SerializedName("location") String getLocation(); diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java index 103a70e8..62e6b1bf 100644 --- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java +++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java @@ -17,12 +17,14 @@ package org.onap.dcaegen2.collectors.datafile.model; public class ConsumerDmaapModelForUnitTest implements ConsumerDmaapModel { + private final String name; private final String location; private final String compression; private final String fileFormatType; private final String fileFormatVersion; public ConsumerDmaapModelForUnitTest() { + this.name = "A20161224.1030-1045.bin.gz"; this.location = "target/A20161224.1030-1045.bin.gz"; this.compression = "gzip"; this.fileFormatType = "org.3GPP.32.435#measCollec"; @@ -30,6 +32,11 @@ public class ConsumerDmaapModelForUnitTest implements ConsumerDmaapModel { } @Override + public String getName() { + return name; + } + + @Override public String getLocation() { return location; } diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java index 062724e7..f27e3b6c 100644 --- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java +++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java @@ -21,11 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; class CommonFunctionsTest { - // Given private ConsumerDmaapModel model = new ConsumerDmaapModelForUnitTest(); private static final String EXPECTED_RESULT = - "{\"location\":\"target/A20161224.1030-1045.bin.gz\",\"compression\":\"gzip\"," - + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\",\"fileFormatVersion\":\"V10\"}"; + "{\"name\":\"A20161224.1030-1045.bin.gz\",\"location\":\"target/A20161224.1030-1045.bin.gz\"," + + "\"compression\":\"gzip\",\"fileFormatType\":\"org.3GPP.32.435#measCollec\"," + + "\"fileFormatVersion\":\"V10\"}"; @Test void createJsonBody_shouldReturnJsonInString() { diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java index 5b028973..e80670d3 100644 --- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java +++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; public class ConsumerDmaapModelTest { + private static final String NAME = "A20161224.1030-1045.bin.gz"; private static final String LOCATION = "target/A20161224.1030-1045.bin.gz"; private static final String COMPRESSION = "gzip"; private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; @@ -29,13 +30,12 @@ public class ConsumerDmaapModelTest { @Test public void consumerDmaapModelBuilder_shouldBuildAnObject() { - // When - // Given - ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location(LOCATION).compression(COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); + ConsumerDmaapModel consumerDmaapModel = + ImmutableConsumerDmaapModel.builder().name(NAME).location(LOCATION).compression(COMPRESSION) + .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build(); - // Then Assertions.assertNotNull(consumerDmaapModel); + Assertions.assertEquals(NAME, consumerDmaapModel.getName()); Assertions.assertEquals(LOCATION, consumerDmaapModel.getLocation()); Assertions.assertEquals(COMPRESSION, consumerDmaapModel.getCompression()); Assertions.assertEquals(FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType()); diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java index 4b8ce08f..36050ff7 100644 --- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java +++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java @@ -21,7 +21,6 @@ import com.google.gson.JsonParser; import java.io.File; import java.net.URI; -import java.util.List; import org.apache.http.HttpHeaders; import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; @@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -47,7 +47,8 @@ import reactor.core.publisher.Mono; public class DmaapProducerReactiveHttpClient { private static final String X_ATT_DR_META = "X-ATT-DR-META"; - private static final String LOCATION = "location"; + private static final String NAME_JSON_TAG = "name"; + private static final String LOCATION_JSON_TAG = "location"; private static final String DEFAULT_FEED_ID = "1"; private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient { /** * Function for calling DMaaP HTTP producer - post request to DMaaP. * - * @param consumerDmaapModelMono - object which will be sent to DMaaP + * @param consumerDmaapModel - object which will be sent to DMaaP * @return status code of operation */ - public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) { - consumerDmaapModelMono.subscribe(models -> postFilesAndData(models)); - return Mono.just(HttpStatus.OK.toString()); - } + public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) { + logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel); - public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { - this.webClient = webClient; - return this; - } + RequestBodyUriSpec post = webClient.post(); - private void postFilesAndData(List<ConsumerDmaapModel> models) { - for (ConsumerDmaapModel consumerDmaapModel : models) { - postFileAndData(consumerDmaapModel); - } - } + prepareHead(consumerDmaapModel, post); - private void postFileAndData(ConsumerDmaapModel model) { - RequestBodyUriSpec post = webClient.post(); + prepareBody(consumerDmaapModel, post); - boolean headPrepared = prepareHead(model, post); + ResponseSpec responseSpec = post.retrieve(); + responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse)); + Flux<String> response = responseSpec.bodyToFlux(String.class); - if (headPrepared) { - prepareBody(model, post); + logger.trace("Exiting getDmaapProducerResponse with {}", response); + return response; + } - ResponseSpec responseSpec = post.retrieve(); - responseSpec.onStatus(HttpStatus::is4xxClientError, - clientResponse -> handlePostErrors(model, clientResponse)); - responseSpec.onStatus(HttpStatus::is5xxServerError, - clientResponse -> handlePostErrors(model, clientResponse)); - String bodyToMono = responseSpec.bodyToMono(String.class).block(); - logger.debug("File info sent to DR with response: " + bodyToMono); - } + public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) { + this.webClient = webClient; + return this; } - private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { - boolean result = true; + private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) { post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); - String location = metaData.getAsJsonObject().remove(LOCATION).getAsString(); + String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); post.header(X_ATT_DR_META, metaData.toString()); - post.uri(getUri(location)); - - return result; + post.uri(getUri(name)); } private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) { @@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient { post.body(BodyInserters.fromResource(httpResource)); } - private URI getUri(String location) { - String fileName = location.substring(location.indexOf("/"), location.length()); - String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; - URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName) - .port(dmaapPortNumber).path(path).build(); - return uri; + private URI getUri(String fileName) { + String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName; + return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber) + .path(path).build(); } private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) { diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java index c0dbf31b..5f4c1a58 100644 --- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java +++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java @@ -42,7 +42,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.util.DefaultUriBuilderFactory; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 @@ -52,6 +53,7 @@ class DmaapProducerReactiveHttpClientTest { private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String LOCATION_JSON_TAG = "location"; + private static final String NAME_JSON_TAG = "name"; private static final String X_ATT_DR_META = "X-ATT-DR-META"; private static final String HOST = "54.45.33.2"; @@ -98,11 +100,13 @@ class DmaapProducerReactiveHttpClientTest { List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>(); consumerDmaapModelList.add(consumerDmaapModel); - dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList)); + StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel)) + .expectNext("200").verifyComplete(); verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel)); metaData.getAsJsonObject().remove(LOCATION_JSON_TAG); + metaData.getAsJsonObject().remove(NAME_JSON_TAG); verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString()); URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT) .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build(); @@ -116,7 +120,7 @@ class DmaapProducerReactiveHttpClientTest { when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock); when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock); - Mono<String> expectedResult = Mono.just("200"); - when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult); + Flux<String> expectedResult = Flux.just("200"); + when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult); } } |