diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java | 14 |
1 files changed, 5 insertions, 9 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 8742d872..a5ecc1dd 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ @@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser { private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; private static final String SOURCE_NAME = "sourceName"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. * @@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser { */ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { return monoMessage - .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message)) .flatMap(this::getJsonParserMessage) .flatMap(this::createJsonConsumerModel); } private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromCallable(() -> new JsonParser().parse(message)); } private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() - ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) + ? create(Mono.fromCallable(jsonElement::getAsJsonObject)) : getConsumerDmaapModelFromJsonArray(jsonElement); } |