aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java23
1 files changed, 5 insertions, 18 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 00a6d465..a69b7c54 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@@ -77,26 +78,14 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonArray> monoMessage) {
return monoMessage
- .flatMapMany(this::getJsonParserMessage)
- .flatMap(this::createJsonConsumerModel);
+ .flatMapMany(this::getConsumerDmaapModelFromJsonArray);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty")
- : Mono.fromCallable(() -> new JsonParser().parse(message));
- }
-
- private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject()
- ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getConsumerDmaapModelFromJsonArray(jsonElement);
- }
-
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray jsonElement) {
return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
@@ -115,7 +104,6 @@ public class DmaapConsumerJsonParser {
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
-
JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT)
@@ -123,7 +111,6 @@ public class DmaapConsumerJsonParser {
this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE);
-
this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS);
this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER);