diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-06-22 20:11:20 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-08-06 08:51:56 +0200 |
commit | 2cfcc6756e59ed8cda571efa8b29764eab7837c8 (patch) | |
tree | e99375044d7ca9a9cac0962e459cafd5f580b094 | |
parent | a684d478f8b81bba83123d4f1fd1ec3c29df73ca (diff) |
Added reactive tasks flow control
Change-Id: I9cb2bede66e9e446912f2e6a815c7b56b80813b9
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
12 files changed, 41 insertions, 117 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java index 784bc5af..df8330f4 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java @@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIConsumerClient; -public abstract class AAIConsumerTask<R, S, C> extends Task<R, S, C> { +public abstract class AAIConsumerTask { abstract Optional<String> consume(ConsumerDmaapModel message) throws AAINotFoundException; abstract AAIConsumerClient resolveClient(); + + abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java index 4c35b2ee..c545a1be 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java @@ -25,7 +25,6 @@ import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIConsumerClient; import org.slf4j.Logger; @@ -34,8 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component -public class AAIConsumerTaskImpl extends - AAIConsumerTask<ConsumerDmaapModel, String, AAIClientConfiguration> { +public class AAIConsumerTaskImpl extends AAIConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -59,14 +57,6 @@ public class AAIConsumerTaskImpl extends } @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - String response = execute(body); - if (taskProcess != null && response != null && !response.isEmpty()) { - taskProcess.receiveRequest(response); - } - } - - @Override public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) .orElseThrow(() -> new AAINotFoundException("Invoked null object to AAI task")); @@ -75,7 +65,6 @@ public class AAIConsumerTaskImpl extends return consume(consumerDmaapModel).orElseThrow(() -> new AAINotFoundException("Null response code")); } - @Override protected AAIClientConfiguration resolveConfiguration() { return prhAppConfig.getAAIClientConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java index c7bde032..abd04640 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java @@ -20,15 +20,18 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIProducerClient; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -public abstract class AAIProducerTask<R, S, C> extends Task<R, S, C> { +public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ { abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException; abstract AAIProducerClient resolveClient(); + + abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java index b637bb29..005d08d1 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java @@ -19,11 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.net.URISyntaxException; +import java.util.Optional; import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.AAIProducerClient; import org.onap.dcaegen2.services.prh.service.HttpUtils; @@ -32,15 +33,12 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.net.URISyntaxException; -import java.util.Optional; - /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @Component public class AAIProducerTaskImpl extends - AAIProducerTask<ConsumerDmaapModel, ConsumerDmaapModel, AAIClientConfiguration> { + AAIProducerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -66,14 +64,6 @@ public class AAIProducerTaskImpl extends } @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - ConsumerDmaapModel response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } - - @Override public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) .orElseThrow(() -> new AAINotFoundException("Invoked null object to AAI task")); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 3e36bcdd..56b678a3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -26,11 +26,13 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -abstract class DmaapConsumerTask<R, S, C> extends Task<R, S, C> { +abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ { abstract ConsumerDmaapModel consume(String message) throws PrhTaskException; abstract ExtendedDmaapConsumerHttpClientImpl resolveClient(); abstract void initConfigs(); + + abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 43eb9eaa..e72939cf 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -23,7 +23,6 @@ import java.util.Optional; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; -import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -38,8 +37,7 @@ import org.springframework.stereotype.Component; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ @Component -public class DmaapConsumerTaskImpl extends - DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> { +public class DmaapConsumerTaskImpl extends DmaapConsumerTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; @@ -67,20 +65,6 @@ public class DmaapConsumerTaskImpl extends } @Override - protected void receiveRequest(String body) throws PrhTaskException { - try { - ConsumerDmaapModel response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } catch (DmaapEmptyResponseException e) { - logger.warn("Nothing to consume from DmaaP {} topic.", - resolveConfiguration().dmaapTopicName()); - } - - } - - @Override public ConsumerDmaapModel execute(String object) throws PrhTaskException { extendedDmaapConsumerHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); @@ -93,7 +77,6 @@ public class DmaapConsumerTaskImpl extends prhAppConfig.initFileStreamReader(); } - @Override protected DmaapConsumerConfiguration resolveConfiguration() { return prhAppConfig.getDmaapConsumerConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index ba8e6e45..bd9a8744 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -26,9 +26,11 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ -abstract class DmaapPublisherTask<R, S, C> extends Task<R, S, C> { +abstract class DmaapPublisherTask { abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; abstract ExtendedDmaapProducerHttpClientImpl resolveClient(); + + abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 1a522921..7cbeb3b3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl; import org.slf4j.Logger; @@ -37,8 +36,7 @@ import org.springframework.stereotype.Component; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @Component -public class DmaapPublisherTaskImpl extends - DmaapPublisherTask<ConsumerDmaapModel, Integer, DmaapPublisherConfiguration> { +public class DmaapPublisherTaskImpl extends DmaapPublisherTask { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; @@ -59,14 +57,6 @@ public class DmaapPublisherTaskImpl extends } @Override - protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException { - Integer response = execute(body); - if (taskProcess != null && response != null) { - taskProcess.receiveRequest(response); - } - } - - @Override public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to Dmaap task")); @@ -75,7 +65,6 @@ public class DmaapPublisherTaskImpl extends return publish(consumerDmaapModel); } - @Override protected DmaapPublisherConfiguration resolveConfiguration() { return prhAppConfig.getDmaapPublisherConfiguration(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index f7767101..addeaae2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,11 +19,12 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,19 +47,24 @@ public class ScheduledTasks { } public void scheduleMainPrhEventTask() { - logger.trace("Execution of task was registered"); - setTaskExecutionFlow(); - try { - dmaapConsumerTask.initConfigs(); - dmaapConsumerTask.receiveRequest(""); - } catch (PrhTaskException e) { - logger - .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e); - } - } + logger.trace("Execution of tasks was registered"); - private void setTaskExecutionFlow() { - dmaapConsumerTask.setNext(aaiProducerTask); - aaiProducerTask.setNext(dmaapProducerTask); + Mono.fromSupplier(() -> Mono.fromCallable(() -> + { + dmaapConsumerTask.initConfigs(); + return dmaapConsumerTask.execute(""); + }).subscribe(consumerDmaapModel -> Mono + .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel)) + .subscribe( + aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel)) + .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp), + error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error), + () -> logger.info("Completed DmaapPublisher task"))), + errorResponse -> logger + .warn("Error has been thrown in AAIProducerTask: {}", errorResponse) + , () -> logger.info("Completed AAIProducer task"))) + .subscribe(Disposable::dispose, tasksError -> logger + .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError) + , () -> logger.info("PRH tasks was consumed properly")).dispose(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java deleted file mode 100644 index e2b11fdc..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcaegen2.services.prh.tasks; - -import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ - - -public abstract class Task<R, S, C> { - - Task taskProcess; - - abstract protected void receiveRequest(R body) throws PrhTaskException; - - abstract protected S execute(R object) throws PrhTaskException; - - abstract protected C resolveConfiguration(); - - void setNext(Task task) { - this.taskProcess = task; - } -} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java index 60e1bd54..5736afeb 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java @@ -41,7 +41,7 @@ public class DmaapConsumerTaskSpy { @Bean @Primary - public Task registerSimpleDmaapConsumerTask() { + public DmaapConsumerTask registerSimpleDmaapConsumerTask() { AppConfig appConfig = spy(AppConfig.class); doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration(); DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig)); diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java index b2b97cf0..01056606 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java @@ -39,7 +39,7 @@ public class DmaapProducerTaskSpy { @Bean @Primary - public Task registerSimpleDmaapPublisherTask() { + public DmaapPublisherTask registerSimpleDmaapPublisherTask() { AppConfig appConfig = spy(AppConfig.class); doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration(); DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig)); |