From 2cfcc6756e59ed8cda571efa8b29764eab7837c8 Mon Sep 17 00:00:00 2001 From: wasala Date: Fri, 22 Jun 2018 20:11:20 +0200 Subject: Added reactive tasks flow control Change-Id: I9cb2bede66e9e446912f2e6a815c7b56b80813b9 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala --- .../services/prh/tasks/AAIConsumerTask.java | 4 ++- .../services/prh/tasks/AAIConsumerTaskImpl.java | 13 +------ .../services/prh/tasks/AAIProducerTask.java | 5 ++- .../services/prh/tasks/AAIProducerTaskImpl.java | 16 ++------- .../services/prh/tasks/DmaapConsumerTask.java | 4 ++- .../services/prh/tasks/DmaapConsumerTaskImpl.java | 19 +--------- .../services/prh/tasks/DmaapPublisherTask.java | 4 ++- .../services/prh/tasks/DmaapPublisherTaskImpl.java | 13 +------ .../services/prh/tasks/ScheduledTasks.java | 34 ++++++++++-------- .../org/onap/dcaegen2/services/prh/tasks/Task.java | 42 ---------------------- 10 files changed, 39 insertions(+), 115 deletions(-) delete mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java (limited to 'prh-app-server/src/main/java/org') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java index 784bc5af..df8330f4 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java @@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIConsumerClient; -public abstract class AAIConsumerTask extends Task { +public abstract class AAIConsumerTask { abstract Optional consume(ConsumerDmaapModel message) throws AAINotFoundException; abstract AAIConsumerClient resolveClient(); + + abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java index 4c35b2ee..c545a1be 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java @@ -25,7 +25,6 @@ import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIConsumerClient; import org.slf4j.Logger; @@ -34,8 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class AAIConsumerTaskImpl extends - AAIConsumerTask { +public class AAIConsumerTaskImpl extends AAIConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -58,14 +56,6 @@ public class AAIConsumerTaskImpl extends } } - @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - String response = execute(body); - if (taskProcess != null && response != null && !response.isEmpty()) { - taskProcess.receiveRequest(response); - } - } - @Override public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) @@ -75,7 +65,6 @@ public class AAIConsumerTaskImpl extends return consume(consumerDmaapModel).orElseThrow(() -> new AAINotFoundException("Null response code")); } - @Override protected AAIClientConfiguration resolveConfiguration() { return prhAppConfig.getAAIClientConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java index c7bde032..abd04640 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java @@ -20,15 +20,18 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIProducerClient; /** * @author Przemysław Wąsala on 4/13/18 */ -public abstract class AAIProducerTask extends Task { +public abstract class AAIProducerTask/* extends Task */ { abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException; abstract AAIProducerClient resolveClient(); + + abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java index b637bb29..005d08d1 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java @@ -19,11 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.net.URISyntaxException; +import java.util.Optional; import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIProducerClient; import org.onap.dcaegen2.services.prh.service.HttpUtils; @@ -32,15 +33,12 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.net.URISyntaxException; -import java.util.Optional; - /** * @author Przemysław Wąsala on 4/13/18 */ @Component public class AAIProducerTaskImpl extends - AAIProducerTask { + AAIProducerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -65,14 +63,6 @@ public class AAIProducerTaskImpl extends } } - @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - ConsumerDmaapModel response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } - @Override public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 3e36bcdd..56b678a3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -26,11 +26,13 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp /** * @author Przemysław Wąsala on 4/13/18 */ -abstract class DmaapConsumerTask extends Task { +abstract class DmaapConsumerTask /* extends Task*/ { abstract ConsumerDmaapModel consume(String message) throws PrhTaskException; abstract ExtendedDmaapConsumerHttpClientImpl resolveClient(); abstract void initConfigs(); + + abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 43eb9eaa..e72939cf 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -23,7 +23,6 @@ import java.util.Optional; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; -import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -38,8 +37,7 @@ import org.springframework.stereotype.Component; * @author Przemysław Wąsala on 3/23/18 */ @Component -public class DmaapConsumerTaskImpl extends - DmaapConsumerTask { +public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; @@ -66,20 +64,6 @@ public class DmaapConsumerTaskImpl extends } - @Override - protected void receiveRequest(String body) throws PrhTaskException { - try { - ConsumerDmaapModel response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } catch (DmaapEmptyResponseException e) { - logger.warn("Nothing to consume from DmaaP {} topic.", - resolveConfiguration().dmaapTopicName()); - } - - } - @Override public ConsumerDmaapModel execute(String object) throws PrhTaskException { extendedDmaapConsumerHttpClient = resolveClient(); @@ -93,7 +77,6 @@ public class DmaapConsumerTaskImpl extends prhAppConfig.initFileStreamReader(); } - @Override protected DmaapConsumerConfiguration resolveConfiguration() { return prhAppConfig.getDmaapConsumerConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index ba8e6e45..bd9a8744 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -26,9 +26,11 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp /** * @author Przemysław Wąsala on 3/23/18 */ -abstract class DmaapPublisherTask extends Task { +abstract class DmaapPublisherTask { abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; abstract ExtendedDmaapProducerHttpClientImpl resolveClient(); + + abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 1a522921..7cbeb3b3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl; import org.slf4j.Logger; @@ -37,8 +36,7 @@ import org.springframework.stereotype.Component; * @author Przemysław Wąsala on 4/13/18 */ @Component -public class DmaapPublisherTaskImpl extends - DmaapPublisherTask { +public class DmaapPublisherTaskImpl extends DmaapPublisherTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; @@ -58,14 +56,6 @@ public class DmaapPublisherTaskImpl extends .orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap")); } - @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - Integer response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } - @Override public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) @@ -75,7 +65,6 @@ public class DmaapPublisherTaskImpl extends return publish(consumerDmaapModel); } - @Override protected DmaapPublisherConfiguration resolveConfiguration() { return prhAppConfig.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index f7767101..addeaae2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,11 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; /** * @author Przemysław Wąsala on 3/23/18 @@ -46,19 +47,24 @@ public class ScheduledTasks { } public void scheduleMainPrhEventTask() { - logger.trace("Execution of task was registered"); - setTaskExecutionFlow(); - try { - dmaapConsumerTask.initConfigs(); - dmaapConsumerTask.receiveRequest(""); - } catch (PrhTaskException e) { - logger - .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e); - } - } + logger.trace("Execution of tasks was registered"); - private void setTaskExecutionFlow() { - dmaapConsumerTask.setNext(aaiProducerTask); - aaiProducerTask.setNext(dmaapProducerTask); + Mono.fromSupplier(() -> Mono.fromCallable(() -> + { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); + }).subscribe(consumerDmaapModel -> Mono + .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel)) + .subscribe( + aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel)) + .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp), + error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error), + () -> logger.info("Completed DmaapPublisher task"))), + errorResponse -> logger + .warn("Error has been thrown in AAIProducerTask: {}", errorResponse) + , () -> logger.info("Completed AAIProducer task"))) + .subscribe(Disposable::dispose, tasksError -> logger + .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError) + , () -> logger.info("PRH tasks was consumed properly")).dispose(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java deleted file mode 100644 index e2b11fdc..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcaegen2.services.prh.tasks; - -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; - -/** - * @author Przemysław Wąsala on 4/13/18 - */ - - -public abstract class Task { - - Task taskProcess; - - abstract protected void receiveRequest(R body) throws PrhTaskException; - - abstract protected S execute(R object) throws PrhTaskException; - - abstract protected C resolveConfiguration(); - - void setNext(Task task) { - this.taskProcess = task; - } -} -- cgit 1.2.3-korg