aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java28
3 files changed, 29 insertions, 30 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3c606deb..a8f79ea1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -123,12 +123,11 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transformMessages(monoJsonP));
+ return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
+ : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
@@ -138,22 +137,22 @@ public class JsonMessageParser {
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
// @formatter:off
- return Flux.just(ImmutableFileReadyMessage.builder()
+ return Mono.just(ImmutableFileReadyMessage.builder()
.pnfName(messageMetaData.sourceName())
.messageMetaData(messageMetaData)
.files(allFileDataFromJson)
.build());
// @formatter:on
} else {
- return Flux.empty();
+ return Mono.empty();
}
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 338c8323..4c0dcce5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -50,27 +50,27 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
- return Flux.just(model)
- .cache(1)
+ return Mono.just(model)
+ .cache()
.flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Flux.just(model);
+ return Mono.just(model);
} else {
- logger.warn("Publish to DR unsuccessful, response code: " + response);
- return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ logger.warn("Publish to DR unsuccessful, response code: {}", response);
+ return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index f22c7bf9..37b7a559 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -98,7 +98,7 @@ public class ScheduledTasks {
.doOnNext(fileData -> taskCounter.incrementAndGet())
.flatMap(this::collectFileFromXnf)
.flatMap(this::publishToDataRouter)
- .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
.doOnNext(model -> taskCounter.decrementAndGet())
.sequential()
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -109,8 +109,8 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(Path localFile) {
- logger.info("Datafile consumed tasks." + localFile);
+ private void onSuccess(ConsumerDmaapModel model) {
+ logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
private void onError(Throwable throwable) {
@@ -138,18 +138,19 @@ public class ScheduledTasks {
return fileCollect.collectorTask
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
- logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
- deleteFile(fileData.getLocalFileName());
- alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ Path localFileName = fileData.getLocalFileName();
+ logger.error("File fetching failed: {}", localFileName);
+ deleteFile(localFileName);
+ alreadyPublishedFiles.remove(localFileName);
taskCounter.decrementAndGet();
return Mono.empty();
}
- private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
@@ -160,13 +161,13 @@ public class ScheduledTasks {
}
- private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
taskCounter.decrementAndGet();
- return Flux.empty();
+ return Mono.empty();
}
private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
@@ -179,7 +180,7 @@ public class ScheduledTasks {
final DMaaPMessageConsumerTask messageConsumerTask =
new DMaaPMessageConsumerTask(this.applicationConfiguration);
return messageConsumerTask.execute()
- .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ .onErrorResume(this::handleConsumeMessageFailure);
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
@@ -187,13 +188,12 @@ public class ScheduledTasks {
return Flux.empty();
}
- private Flux<Path> deleteFile(Path localFile) {
+ private void deleteFile(Path localFile) {
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
} catch (Exception e) {
logger.warn("Could not delete file: {}, {}", localFile, e);
}
- return Flux.just(localFile);
}
}