aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-26 15:15:03 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-07 08:21:02 +0200
commit8b1502fb0f1af5d00ec26e712e57b792fbd16bd8 (patch)
tree7cc80c278f17710863e6d865df77c5edfa6d4fbc /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
parente17c2d89d0470501fa60ed487726b0bbf3305f8c (diff)
Added dmaapReactiveConsumer
*Tests have not been ready yet Change-Id: I2e1d9c4218f91ae2f066b28acdbaa1870d7d27e7 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java40
1 files changed, 25 insertions, 15 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index ee42ce4a..20ec78fc 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -26,11 +26,12 @@ import java.util.Optional;
import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -46,19 +47,29 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
- JsonElement jsonElement = new JsonParser().parse(message);
- Optional<ConsumerDmaapModel> consumerDmaapModel;
- if (jsonElement.isJsonObject()) {
- consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
- } else {
- consumerDmaapModel = Optional
- .of(create(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
- }
- logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
- return consumerDmaapModel;
+ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ return monoMessage.flatMap(message ->
+ {
+ if (!StringUtils.isEmpty(message)) {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel;
+ try {
+ if (jsonElement.isJsonObject()) {
+ consumerDmaapModel = create(jsonElement.getAsJsonObject());
+ } else {
+ consumerDmaapModel = create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
+ }
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return Mono.just(consumerDmaapModel);
+ } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+ return Mono.error(e);
+ }
+ }
+ return Mono.error(new DmaapEmptyResponseException());
+ });
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -101,5 +112,4 @@ public class DmaapConsumerJsonParser {
private boolean containsHeader(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
}
-
}