summaryrefslogtreecommitdiffstats
path: root/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java')
-rw-r--r--components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java16
1 files changed, 2 insertions, 14 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
index 947d7a7c..9fe0c277 100644
--- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
+++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
@@ -32,7 +32,6 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
@@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser {
* @param dmaapResponse Response from DMaaP
* @return Re-Registration Consumer DMaaP reactive model
*/
- public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {