diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 152 |
1 files changed, 96 insertions, 56 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index a7bf42d1..aae5bc77 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -20,30 +20,27 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; -import javax.net.ssl.SSLException; - +import org.apache.http.HttpResponse; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; +import org.slf4j.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClientResponse; -import org.apache.http.HttpResponse; + +import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID; +import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -54,7 +51,9 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final DmaapConsumerTask dmaapConsumerTask; - private final DmaapPublisherTask dmaapProducerTask; + private final DmaapPublisherTask dmaapReadyProducerTask; + private final DmaapPublisherTask dmaapUpdateProducerTask; + private final AaiQueryTask aaiQueryTask; private final AaiProducerTask aaiProducerTask; private final BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; @@ -62,24 +61,39 @@ public class ScheduledTasks { /** * Constructor for tasks registration in PRHWorkflow. * - * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - third task - * @param aaiPublisherTask - second task + * @param dmaapConsumerTask - fist task + * @param dmaapReadyPublisherTask - third task + * @param dmaapUpdatePublisherTask - fourth task + * @param aaiPublisherTask - second task */ @Autowired public ScheduledTasks( - DmaapConsumerTask dmaapConsumerTask, - DmaapPublisherTask dmaapPublisherTask, - AaiProducerTask aaiPublisherTask, - BbsActionsTask bbsActionsTask, - Map<String, String> mdcContextMap) { + final DmaapConsumerTask dmaapConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, + final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, + final Map<String, String> mdcContextMap) { this.dmaapConsumerTask = dmaapConsumerTask; - this.dmaapProducerTask = dmaapPublisherTask; + this.dmaapReadyProducerTask = dmaapReadyPublisherTask; + this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; + this.aaiQueryTask = aaiQueryTask; this.aaiProducerTask = aaiPublisherTask; this.bbsActionsTask = bbsActionsTask; this.mdcContextMap = mdcContextMap; } + static class State { + public final ConsumerDmaapModel DmaapModel; + public final Boolean ActivationStatus; + + public State(final ConsumerDmaapModel DmaapModel, final Boolean ActivationStatus) { + this.DmaapModel = DmaapModel; + this.ActivationStatus = ActivationStatus; + } + } + /** * Main function for scheduling prhWorkflow. */ @@ -89,22 +103,23 @@ public class ScheduledTasks { logger.trace("Execution of tasks was registered"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromDMaaPMessage() - .doOnError(DmaapEmptyResponseException.class, error -> - logger.warn("Nothing to consume from DMaaP") - ) - .flatMap(this::publishToAaiConfiguration) - .doOnError(exception -> - logger.warn("AAIProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .flatMap(this::processAdditionalFields) - .doOnError(exception -> - logger.warn("BBSActionsTask exception has been registered: ", exception)) - .flatMap(this::publishToDmaapConfigurationWithApache) - .doOnError(exception -> - logger.warn("DMaaPProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .doOnTerminate(mainCountDownLatch::countDown) - .subscribe(this::onSuccess, this::onError, this::onComplete); + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .doOnError(exception -> + logger.warn("AAIProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .flatMap(this::processAdditionalFields) + .doOnError(exception -> + logger.warn("BBSActionsTask exception has been registered: ", exception)) + .flatMap(this::publishToDmaapConfigurationWithApache) + .doOnError(exception -> + logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); mainCountDownLatch.await(); } catch (InterruptedException e) { @@ -119,13 +134,13 @@ public class ScheduledTasks { /** * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - * */ + */ @Deprecated private void onSuccess(HttpClientResponse response) { String statusCode = Integer.toString(response.status().code()); MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); + statusCode); MDC.remove(RESPONSE_CODE); } @@ -133,12 +148,11 @@ public class ScheduledTasks { String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); + statusCode); MDC.remove(RESPONSE_CODE); } - private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); @@ -163,40 +177,66 @@ public class ScheduledTasks { } } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { + private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { + return aaiQueryTask + .execute(monoDMaaPModel) + .map(x -> new State(monoDMaaPModel, x)); + } + + private Mono<State> publishToAaiConfiguration(final State state) { try { - return aaiProducerTask.execute(monoDMaaPModel); + return state.ActivationStatus + ? Mono.just(state) + : aaiProducerTask + .execute(state.DmaapModel) + .map(x -> state); } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<ConsumerDmaapModel> processAdditionalFields(ConsumerDmaapModel consumerDmaapModel) { - return bbsActionsTask.execute(consumerDmaapModel); + private Mono<State> processAdditionalFields(final State state) { + if (state.ActivationStatus) { + logger.debug("Re-registration - Logical links won't be updated."); + + return Mono.just(state); + } + + return bbsActionsTask.execute(state.DmaapModel).map(x -> state); } /** * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - * */ + */ @Deprecated - private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { + private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + publishToDmaapConfiguration(final State state) { try { - return dmaapProducerTask.execute(monoAaiModel); + if (state.ActivationStatus) { + logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.execute(state.DmaapModel); + } + + return dmaapReadyProducerTask.execute(state.DmaapModel); } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) { + private Mono<org.apache.http.HttpResponse> + publishToDmaapConfigurationWithApache(final State state) { try { - return dmaapProducerTask.executeWithApache(monoAaiModel); + if (state.ActivationStatus) { + logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel); + } + + return dmaapReadyProducerTask.executeWithApache(state.DmaapModel); } catch (Exception e) { return Mono.error(e); } } - - private Predicate<Throwable> resumePrhPredicate() { return exception -> exception instanceof PrhTaskException; } |