diff options
Diffstat (limited to 'datafile-app-server/src/main')
3 files changed, 29 insertions, 30 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3c606deb..a8f79ea1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -123,12 +123,11 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transformMessages(monoJsonP)); + return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Flux<FileReadyMessage> transformMessages(JsonObject message) { + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); @@ -138,22 +137,22 @@ public class JsonMessageParser { if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); // @formatter:off - return Flux.just(ImmutableFileReadyMessage.builder() + return Mono.just(ImmutableFileReadyMessage.builder() .pnfName(messageMetaData.sourceName()) .messageMetaData(messageMetaData) .files(allFileDataFromJson) .build()); // @formatter:on } else { - return Flux.empty(); + return Mono.empty(); } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); + return Mono.empty(); } logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); + return Mono.empty(); } private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 338c8323..4c0dcce5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -50,27 +50,27 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off - return Flux.just(model) - .cache(1) + return Mono.just(model) + .cache() .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Flux.just(model); + return Mono.just(model); } else { - logger.warn("Publish to DR unsuccessful, response code: " + response); - return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + logger.warn("Publish to DR unsuccessful, response code: {}", response); + return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index f22c7bf9..37b7a559 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -98,7 +98,7 @@ public class ScheduledTasks { .doOnNext(fileData -> taskCounter.incrementAndGet()) .flatMap(this::collectFileFromXnf) .flatMap(this::publishToDataRouter) - .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) .doOnNext(model -> taskCounter.decrementAndGet()) .sequential() .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -109,8 +109,8 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(Path localFile) { - logger.info("Datafile consumed tasks." + localFile); + private void onSuccess(ConsumerDmaapModel model) { + logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } private void onError(Throwable throwable) { @@ -138,18 +138,19 @@ public class ScheduledTasks { return fileCollect.collectorTask .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { - logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); - deleteFile(fileData.getLocalFileName()); - alreadyPublishedFiles.remove(fileData.getLocalFileName()); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) { + Path localFileName = fileData.getLocalFileName(); + logger.error("File fetching failed: {}", localFileName); + deleteFile(localFileName); + alreadyPublishedFiles.remove(localFileName); taskCounter.decrementAndGet(); return Mono.empty(); } - private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); @@ -160,13 +161,13 @@ public class ScheduledTasks { } - private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); taskCounter.decrementAndGet(); - return Flux.empty(); + return Mono.empty(); } private Flux<FileReadyMessage> consumeMessagesFromDmaap() { @@ -179,7 +180,7 @@ public class ScheduledTasks { final DMaaPMessageConsumerTask messageConsumerTask = new DMaaPMessageConsumerTask(this.applicationConfiguration); return messageConsumerTask.execute() - .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + .onErrorResume(this::handleConsumeMessageFailure); } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { @@ -187,13 +188,12 @@ public class ScheduledTasks { return Flux.empty(); } - private Flux<Path> deleteFile(Path localFile) { + private void deleteFile(Path localFile) { logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); } catch (Exception e) { logger.warn("Could not delete file: {}, {}", localFile, e); } - return Flux.just(localFile); } } |