aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java14
1 files changed, 5 insertions, 9 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 8742d872..a5ecc1dd 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser {
private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
private static final String SOURCE_NAME = "sourceName";
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
@@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser {
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
private Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
- : Mono.fromSupplier(() -> new JsonParser().parse(message));
+ : Mono.fromCallable(() -> new JsonParser().parse(message));
}
private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+ ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}