From f88fc3ab6f40b5e9ad102aa3142606ff14347b5e Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 22 Feb 2019 13:51:34 +0000 Subject: Fixed some sonar issues Changed some Flux to Mono. Removed some obfuscating wrappers. Issue-ID: DCAEGEN2-1118 Change-Id: I76dcaea7c69608cf404389fad93f7539f735aad2 Signed-off-by: PatrikBuhr --- .../datafile/service/JsonMessageParser.java | 15 ++++++------ .../datafile/tasks/DataRouterPublisher.java | 16 ++++++------- .../collectors/datafile/tasks/ScheduledTasks.java | 28 +++++++++++----------- 3 files changed, 29 insertions(+), 30 deletions(-) (limited to 'datafile-app-server/src/main') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3c606deb..a8f79ea1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -123,12 +123,11 @@ public class JsonMessageParser { } private Flux createMessages(Flux jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transformMessages(monoJsonP)); + return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Flux transformMessages(JsonObject message) { + private Mono transformMessages(JsonObject message) { Optional optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); @@ -138,22 +137,22 @@ public class JsonMessageParser { if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); // @formatter:off - return Flux.just(ImmutableFileReadyMessage.builder() + return Mono.just(ImmutableFileReadyMessage.builder() .pnfName(messageMetaData.sourceName()) .messageMetaData(messageMetaData) .files(allFileDataFromJson) .build()); // @formatter:on } else { - return Flux.empty(); + return Mono.empty(); } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); + return Mono.empty(); } logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); + return Mono.empty(); } private Optional getMessageMetaData(JsonObject message) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 338c8323..4c0dcce5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Przemysław Wąsala on 4/13/18 @@ -50,27 +50,27 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Flux execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off - return Flux.just(model) - .cache(1) + return Mono.just(model) + .cache() .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Flux handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + private Mono handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Flux.just(model); + return Mono.just(model); } else { - logger.warn("Publish to DR unsuccessful, response code: " + response); - return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + logger.warn("Publish to DR unsuccessful, response code: {}", response); + return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index f22c7bf9..37b7a559 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -98,7 +98,7 @@ public class ScheduledTasks { .doOnNext(fileData -> taskCounter.incrementAndGet()) .flatMap(this::collectFileFromXnf) .flatMap(this::publishToDataRouter) - .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) .doOnNext(model -> taskCounter.decrementAndGet()) .sequential() .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -109,8 +109,8 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(Path localFile) { - logger.info("Datafile consumed tasks." + localFile); + private void onSuccess(ConsumerDmaapModel model) { + logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } private void onError(Throwable throwable) { @@ -138,18 +138,19 @@ public class ScheduledTasks { return fileCollect.collectorTask .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); } - private Mono handleCollectFailure(FileData fileData, Throwable exception) { - logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); - deleteFile(fileData.getLocalFileName()); - alreadyPublishedFiles.remove(fileData.getLocalFileName()); + private Mono handleCollectFailure(FileData fileData) { + Path localFileName = fileData.getLocalFileName(); + logger.error("File fetching failed: {}", localFileName); + deleteFile(localFileName); + alreadyPublishedFiles.remove(localFileName); taskCounter.decrementAndGet(); return Mono.empty(); } - private Flux publishToDataRouter(ConsumerDmaapModel model) { + private Mono publishToDataRouter(ConsumerDmaapModel model) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); @@ -160,13 +161,13 @@ public class ScheduledTasks { } - private Flux handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); taskCounter.decrementAndGet(); - return Flux.empty(); + return Mono.empty(); } private Flux consumeMessagesFromDmaap() { @@ -179,7 +180,7 @@ public class ScheduledTasks { final DMaaPMessageConsumerTask messageConsumerTask = new DMaaPMessageConsumerTask(this.applicationConfiguration); return messageConsumerTask.execute() - .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + .onErrorResume(this::handleConsumeMessageFailure); } private Flux handleConsumeMessageFailure(Throwable exception) { @@ -187,13 +188,12 @@ public class ScheduledTasks { return Flux.empty(); } - private Flux deleteFile(Path localFile) { + private void deleteFile(Path localFile) { logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); } catch (Exception e) { logger.warn("Could not delete file: {}, {}", localFile, e); } - return Flux.just(localFile); } } -- cgit 1.2.3-korg