aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2019-05-16 17:44:45 +0200
committerpwielebs <piotr.wielebski@nokia.com>2019-05-22 14:01:54 +0200
commit2cf649dda43c7fc7650b5d0047ccc57108918724 (patch)
tree03d07378786376e077f7d95a6a98c4f66ab85719 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
parenta4f457e46a336a30ceea69a742e8b8aa8f2e720f (diff)
Align PRH to El Alto SDK
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c Issue-ID: DCAEGEN2-1501 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java67
1 files changed, 28 insertions, 39 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 43d6922a..b3d84562 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -20,6 +20,23 @@
package org.onap.dcaegen2.services.prh.service;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
@@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.StringUtils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) {
- return monoMessage
- .flatMapMany(this::getConsumerDmaapModelFromJsonArray);
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) {
+ return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items()));
}
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
- LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement);
-
- if (jsonElement instanceof JsonObject) {
- LOGGER.debug("Element is JsonObject");
- return create(Flux.just((JsonObject) jsonElement));
- }
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) {
+ LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items);
- if (jsonElement instanceof JsonArray) {
- LOGGER.debug("Element is JsonArray");
- JsonArray jsonArray = (JsonArray) jsonElement;
- if (jsonArray.size() == 0) {
- LOGGER.debug("Nothing to consume from DMaaP");
- return Flux.empty();
- }
- return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false)
- .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
- .orElseGet(JsonObject::new)))));
+ if (items.size() == 0) {
+ LOGGER.debug("Nothing to consume from DMaaP");
+ return Flux.empty();
}
-
- LOGGER.warn("Element is neither JSON Object or Array");
- return Flux.empty();
+ return create(
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {