From 79984d737c71d3c92f3cd283eaf2b9b6157c2ce2 Mon Sep 17 00:00:00 2001 From: wasala Date: Wed, 27 Jun 2018 14:29:06 +0200 Subject: Refactor Optional in MonoResponse Added JUnit tests for DmaapConsumer Reactive Client. Added property for spring to run PRH as none-web application Change-Id: I8b762389927151387da5b79e8b75b670600ee5e8 Issue-ID: DCAEGEN2-563 Signed-off-by: wasala --- .../prh/service/DmaapConsumerJsonParser.java | 20 ++++++++++---------- .../services/prh/tasks/DmaapConsumerTask.java | 4 ++-- .../services/prh/tasks/DmaapConsumerTaskImpl.java | 8 +++----- .../dcaegen2/services/prh/tasks/ScheduledTasks.java | 6 +++--- 4 files changed, 18 insertions(+), 20 deletions(-) (limited to 'prh-app-server/src/main/java') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 22acf547..1d215c62 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; /** @@ -46,21 +47,20 @@ public class DmaapConsumerJsonParser { private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber"; - public Mono> getJsonObject(Mono> monoMessage) { + public Mono getJsonObject(Mono monoMessage) { return monoMessage.flatMap(message -> { - if (message.isPresent()) { - JsonElement jsonElement = new JsonParser().parse(message.orElse("")); - Optional consumerDmaapModel; + if (!StringUtils.isEmpty(message)) { + JsonElement jsonElement = new JsonParser().parse(message); + ConsumerDmaapModel consumerDmaapModel; try { if (jsonElement.isJsonObject()) { - consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject())); + consumerDmaapModel = create(jsonElement.getAsJsonObject()); } else { - consumerDmaapModel = Optional - .of(create( - StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new))); + consumerDmaapModel = create( + StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() + .flatMap(this::getJsonObjectFromAnArray) + .orElseThrow(DmaapEmptyResponseException::new)); } logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel); return Mono.just(consumerDmaapModel); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 753d1f9c..5cd30f8b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -30,11 +30,11 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono> consume(Mono> message) throws PrhTaskException; + abstract Mono consume(Mono message) throws PrhTaskException; abstract DmaapConsumerReactiveHttpClient resolveClient(); abstract void initConfigs(); - protected abstract Mono> execute(String object) throws PrhTaskException; + protected abstract Mono execute(String object) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 3181c069..08008f0a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -40,7 +40,6 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; @@ -57,17 +56,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; } - @Override - Mono> consume(Mono> message) throws PrhTaskException { + Mono consume(Mono message) { logger.info("Consumed model from DmaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } @Override - public Mono> execute(String object) throws PrhTaskException { + public Mono execute(String object) { dmaapConsumerReactiveHttpClient = resolveClient(); -// dmaapConsumerReactiveHttpClient.initWebClient(); + dmaapConsumerReactiveHttpClient.initWebClient(); logger.trace("Method called with arg {}", object); return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse())); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6fa986e4..e161e3c5 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -77,7 +77,7 @@ public class ScheduledTasks { } } - private Callable>> consumeFromDMaaPMessage() { + private Callable> consumeFromDMaaPMessage() { return () -> { dmaapConsumerTask.initConfigs(); @@ -85,10 +85,10 @@ public class ScheduledTasks { }; } - private Mono publishToAAIConfiguration(Mono> monoDMaaPModel) { + private Mono publishToAAIConfiguration(Mono monoDMaaPModel) { return monoDMaaPModel.flatMap(dmaapModel -> { try { - return Mono.just(aaiProducerTask.execute(dmaapModel.get())); + return Mono.just(aaiProducerTask.execute(dmaapModel)); } catch (PrhTaskException e) { logger.warn("Exception in A&AIProducer task ", e); return Mono.error(e); -- cgit 1.2.3-korg