summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVijay Venkatesh Kumar <vv770d@att.com>2018-08-06 13:24:37 +0000
committerGerrit Code Review <gerrit@onap.org>2018-08-06 13:24:37 +0000
commit0dca38ccb30535a97dcbe2581edbc65af37983a7 (patch)
tree188f95fe63c14971c4bed3456034c9edef180035
parent4b86a6c43440adfc8da9065d700a1fcfea880c23 (diff)
parent58cbfef5661242a2523b7a183a664498fd1f405a (diff)
Merge "Cleaned in code in reactive tasks"
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java3
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java6
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java64
7 files changed, 61 insertions, 29 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index a62321ca..ee42ce4a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -46,8 +46,7 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Optional<ConsumerDmaapModel> getJsonObject(String message)
- throws PrhTaskException {
+ public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
JsonElement jsonElement = new JsonParser().parse(message);
Optional<ConsumerDmaapModel> consumerDmaapModel;
if (jsonElement.isJsonObject()) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
index df8330f4..1bb28504 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
@@ -31,5 +31,5 @@ public abstract class AAIConsumerTask {
abstract AAIConsumerClient resolveClient();
- abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
+ protected abstract String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
index abd04640..4a763ef3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
@@ -27,11 +27,11 @@ import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ {
+public abstract class AAIProducerTask {
abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException;
abstract AAIProducerClient resolveClient();
- abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ protected abstract ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 56b678a3..1be3b28d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
+abstract class DmaapConsumerTask {
abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
@@ -34,5 +34,5 @@ abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
abstract void initConfigs();
- abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException;
+ protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index e72939cf..3944d416 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -58,10 +58,9 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
ConsumerDmaapModel consume(String message) throws PrhTaskException {
- logger.info("Consumed model from DmaaP: {}", message);
+ logger.info("Consumed model from DMaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust"));
-
+ .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
}
@Override
@@ -69,7 +68,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
extendedDmaapConsumerHttpClient = resolveClient();
logger.trace("Method called with arg {}", object);
return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
- new PrhTaskException("DmaapConsumerTask has returned null"))));
+ new PrhTaskException("DMaaPConsumerTask has returned null"))));
}
@Override
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index bd9a8744..3520d134 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -19,7 +19,7 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
@@ -28,9 +28,9 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp
*/
abstract class DmaapPublisherTask {
- abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
+ abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
- abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
+ protected abstract Integer execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index addeaae2..cf096b7b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -19,12 +19,16 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.concurrent.Callable;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.core.Disposable;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -49,22 +53,52 @@ public class ScheduledTasks {
public void scheduleMainPrhEventTask() {
logger.trace("Execution of tasks was registered");
- Mono.fromSupplier(() -> Mono.fromCallable(() ->
+ Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+ .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
+ .flatMap(this::publishToAAIConfiguration)
+ .flatMap(this::publishToDMaaPConfiguration)
+ .subscribeOn(Schedulers.elastic());
+
+ dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ private void onComplete() {
+ logger.info("PRH tasks have been completed");
+ }
+
+ private void onSuccess(Integer responseCode) {
+ logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ }
+
+ private void onError(Throwable throwable) {
+ if (!(throwable instanceof DmaapEmptyResponseException)) {
+ logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+ }
+ }
+
+ private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return () ->
{
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- }).subscribe(consumerDmaapModel -> Mono
- .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
- .subscribe(
- aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
- .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
- error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
- () -> logger.info("Completed DmaapPublisher task"))),
- errorResponse -> logger
- .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
- , () -> logger.info("Completed AAIProducer task")))
- .subscribe(Disposable::dispose, tasksError -> logger
- .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
- , () -> logger.info("PRH tasks was consumed properly")).dispose();
+ };
+ }
+
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
}
}