aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java46
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java10
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java8
7 files changed, 41 insertions, 50 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 1d215c62..7516853e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -48,28 +48,30 @@ public class DmaapConsumerJsonParser {
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
- return monoMessage.flatMap(message ->
- {
- if (!StringUtils.isEmpty(message)) {
- JsonElement jsonElement = new JsonParser().parse(message);
- ConsumerDmaapModel consumerDmaapModel;
- try {
- if (jsonElement.isJsonObject()) {
- consumerDmaapModel = create(jsonElement.getAsJsonObject());
- } else {
- consumerDmaapModel = create(
- StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new));
- }
- logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
- return Mono.just(consumerDmaapModel);
- } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
- return Mono.error(e);
- }
- }
- return Mono.error(new DmaapEmptyResponseException());
- });
+ return monoMessage
+ .flatMap(message -> (StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
+ : convertJsonToConsumerDmaapModel(message)));
+ }
+
+ private Mono<? extends ConsumerDmaapModel> convertJsonToConsumerDmaapModel(String message) {
+ try {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel = jsonElement.isJsonObject() ?
+ create(jsonElement.getAsJsonObject()) :
+ getConsumerDmaapModelFromJsonArray(jsonElement);
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return Mono.just(consumerDmaapModel);
+ } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModelFromJsonArray(JsonElement jsonElement)
+ throws DmaapNotFoundException, DmaapEmptyResponseException {
+ return create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
index 005d08d1..b12fb5bb 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
@@ -26,8 +26,8 @@ import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
-import org.onap.dcaegen2.services.prh.service.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 8df564d2..93c287b4 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -43,10 +43,6 @@ abstract class DmaapConsumerTask {
protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
WebClient buildWebClient() {
- DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration();
- return new DMaaPReactiveWebClient.WebClientBuilder()
- .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType())
- .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName())
- .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build();
+ return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 8c74bac3..45709aa2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -19,7 +19,6 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
@@ -79,8 +78,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
DMaaPConsumerReactiveHttpClient resolveClient() {
- return Optional.ofNullable(dMaaPConsumerReactiveHttpClient)
- .orElseGet(() -> new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(
- buildWebClient()));
+ return dMaaPConsumerReactiveHttpClient == null
+ ? new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient())
+ : dMaaPConsumerReactiveHttpClient;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index af8d14a3..f559683d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -32,19 +32,15 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapPublisherTask {
- abstract Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
abstract DMaaPProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
WebClient buildWebClient() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration();
- return new DMaaPReactiveWebClient.WebClientBuilder()
- .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType())
- .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName())
- .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build();
+ return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 11281d81..673e00f3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -48,14 +48,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
+ Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
consumerDmaapModel);
- return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel).map(Integer::parseInt);
+ return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
.orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task"));
dMaaPProducerReactiveHttpClient = resolveClient();
@@ -70,8 +70,8 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
@Override
DMaaPProducerReactiveHttpClient resolveClient() {
- return Optional.ofNullable(dMaaPProducerReactiveHttpClient)
- .orElseGet(() -> new DMaaPProducerReactiveHttpClient(resolveConfiguration())
- .createDMaaPWebClient(buildWebClient()));
+ return dMaaPProducerReactiveHttpClient == null
+ ? new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient())
+ : dMaaPProducerReactiveHttpClient;
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 2787e64b..365552b7 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -54,7 +54,7 @@ public class ScheduledTasks {
public void scheduleMainPrhEventTask() {
logger.trace("Execution of tasks was registered");
- Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+ Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
.doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
.map(this::publishToAAIConfiguration)
.flatMap(this::publishToDMaaPConfiguration)
@@ -67,7 +67,7 @@ public class ScheduledTasks {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(Integer responseCode) {
+ private void onSuccess(String responseCode) {
logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
}
@@ -90,17 +90,15 @@ public class ScheduledTasks {
try {
return Mono.just(aaiProducerTask.execute(dmaapModel));
} catch (PrhTaskException e) {
- logger.warn("Exception in A&AIProducer task ", e);
return Mono.error(e);
}
});
}
- private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+ private Mono<String> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
try {
return dmaapProducerTask.execute(monoAAIModel);
} catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
return Mono.error(e);
}
}