diff options
author | Vijay Venkatesh Kumar <vv770d@att.com> | 2018-08-06 13:24:37 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-08-06 13:24:37 +0000 |
commit | 0dca38ccb30535a97dcbe2581edbc65af37983a7 (patch) | |
tree | 188f95fe63c14971c4bed3456034c9edef180035 /prh-app-server | |
parent | 4b86a6c43440adfc8da9065d700a1fcfea880c23 (diff) | |
parent | 58cbfef5661242a2523b7a183a664498fd1f405a (diff) |
Merge "Cleaned in code in reactive tasks"
Diffstat (limited to 'prh-app-server')
7 files changed, 61 insertions, 29 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index a62321ca..ee42ce4a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -46,8 +46,7 @@ public class DmaapConsumerJsonParser { private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber"; - public Optional<ConsumerDmaapModel> getJsonObject(String message) - throws PrhTaskException { + public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException { JsonElement jsonElement = new JsonParser().parse(message); Optional<ConsumerDmaapModel> consumerDmaapModel; if (jsonElement.isJsonObject()) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java index df8330f4..1bb28504 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java @@ -31,5 +31,5 @@ public abstract class AAIConsumerTask { abstract AAIConsumerClient resolveClient(); - abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException; + protected abstract String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java index abd04640..4a763ef3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java @@ -27,11 +27,11 @@ import org.onap.dcaegen2.services.prh.service.AAIProducerClient; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ { +public abstract class AAIProducerTask { abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException; abstract AAIProducerClient resolveClient(); - abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; + protected abstract ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 56b678a3..1be3b28d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ { +abstract class DmaapConsumerTask { abstract ConsumerDmaapModel consume(String message) throws PrhTaskException; @@ -34,5 +34,5 @@ abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ { abstract void initConfigs(); - abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException; + protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index e72939cf..3944d416 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -58,10 +58,9 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override ConsumerDmaapModel consume(String message) throws PrhTaskException { - logger.info("Consumed model from DmaaP: {}", message); + logger.info("Consumed model from DMaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message) - .orElseThrow(() -> new DmaapNotFoundException("Null response from JSONObject in single reqeust")); - + .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request")); } @Override @@ -69,7 +68,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { extendedDmaapConsumerHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() -> - new PrhTaskException("DmaapConsumerTask has returned null")))); + new PrhTaskException("DMaaPConsumerTask has returned null")))); } @Override diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index bd9a8744..3520d134 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -19,7 +19,7 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl; @@ -28,9 +28,9 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp */ abstract class DmaapPublisherTask { - abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; + abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; abstract ExtendedDmaapProducerHttpClientImpl resolveClient(); - abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; + protected abstract Integer execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index addeaae2..cf096b7b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,12 +19,16 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.util.concurrent.Callable; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -49,22 +53,52 @@ public class ScheduledTasks { public void scheduleMainPrhEventTask() { logger.trace("Execution of tasks was registered"); - Mono.fromSupplier(() -> Mono.fromCallable(() -> + Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) + .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) + .flatMap(this::publishToAAIConfiguration) + .flatMap(this::publishToDMaaPConfiguration) + .subscribeOn(Schedulers.elastic()); + + dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + } + + private void onComplete() { + logger.info("PRH tasks have been completed"); + } + + private void onSuccess(Integer responseCode) { + logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + } + + private void onError(Throwable throwable) { + if (!(throwable instanceof DmaapEmptyResponseException)) { + logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + } + } + + private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return () -> { dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }).subscribe(consumerDmaapModel -> Mono - .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel)) - .subscribe( - aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel)) - .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp), - error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error), - () -> logger.info("Completed DmaapPublisher task"))), - errorResponse -> logger - .warn("Error has been thrown in AAIProducerTask: {}", errorResponse) - , () -> logger.info("Completed AAIProducer task"))) - .subscribe(Disposable::dispose, tasksError -> logger - .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError) - , () -> logger.info("PRH tasks was consumed properly")).dispose(); + }; + } + + private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) { + try { + return Mono.just(aaiProducerTask.execute(dmaapModel)); + } catch (PrhTaskException e) { + logger.warn("Exception in A&AIProducer task ", e); + return Mono.error(e); + } + } + + private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) { + try { + return Mono.just(dmaapProducerTask.execute(aaiModel)); + } catch (PrhTaskException e) { + logger.warn("Exception in DMaaPProducer task ", e); + return Mono.error(e); + } } } |