aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-27 14:29:06 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-07 09:18:01 +0200
commit79984d737c71d3c92f3cd283eaf2b9b6157c2ce2 (patch)
treed467284687fa9db1ee15ead8a2f94594ec609a05 /prh-app-server/src/main/java/org
parentc8c9a242f7a1f8454e2cf94b0442128533569dc5 (diff)
Refactor Optional in MonoResponse
Added JUnit tests for DmaapConsumer Reactive Client. Added property for spring to run PRH as none-web application Change-Id: I8b762389927151387da5b79e8b75b670600ee5e8 Issue-ID: DCAEGEN2-563 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java20
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java6
4 files changed, 18 insertions, 20 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 22acf547..1d215c62 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
/**
@@ -46,21 +47,20 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Mono<Optional<ConsumerDmaapModel>> getJsonObject(Mono<Optional<String>> monoMessage) {
+ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage.flatMap(message ->
{
- if (message.isPresent()) {
- JsonElement jsonElement = new JsonParser().parse(message.orElse(""));
- Optional<ConsumerDmaapModel> consumerDmaapModel;
+ if (!StringUtils.isEmpty(message)) {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel;
try {
if (jsonElement.isJsonObject()) {
- consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
+ consumerDmaapModel = create(jsonElement.getAsJsonObject());
} else {
- consumerDmaapModel = Optional
- .of(create(
- StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
+ consumerDmaapModel = create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
}
logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
return Mono.just(consumerDmaapModel);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 753d1f9c..5cd30f8b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -30,11 +30,11 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
abstract DmaapConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
- protected abstract Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 3181c069..08008f0a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -40,7 +40,6 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -57,17 +56,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
}
-
@Override
- Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
+ Mono<ConsumerDmaapModel> consume(Mono<String> message) {
logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
- public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
+ public Mono<ConsumerDmaapModel> execute(String object) {
dmaapConsumerReactiveHttpClient = resolveClient();
-// dmaapConsumerReactiveHttpClient.initWebClient();
+ dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6fa986e4..e161e3c5 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -77,7 +77,7 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<Optional<ConsumerDmaapModel>>> consumeFromDMaaPMessage() {
+ private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -85,10 +85,10 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<Optional<ConsumerDmaapModel>> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
return monoDMaaPModel.flatMap(dmaapModel -> {
try {
- return Mono.just(aaiProducerTask.execute(dmaapModel.get()));
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
} catch (PrhTaskException e) {
logger.warn("Exception in A&AIProducer task ", e);
return Mono.error(e);