aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java152
1 files changed, 96 insertions, 56 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index a7bf42d1..aae5bc77 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -20,30 +20,27 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Predicate;
-import javax.net.ssl.SSLException;
-
+import org.apache.http.HttpResponse;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
+import org.slf4j.*;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClientResponse;
-import org.apache.http.HttpResponse;
+
+import javax.net.ssl.SSLException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
+
+import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID;
+import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -54,7 +51,9 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private final DmaapConsumerTask dmaapConsumerTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ private final DmaapPublisherTask dmaapReadyProducerTask;
+ private final DmaapPublisherTask dmaapUpdateProducerTask;
+ private final AaiQueryTask aaiQueryTask;
private final AaiProducerTask aaiProducerTask;
private final BbsActionsTask bbsActionsTask;
private Map<String, String> mdcContextMap;
@@ -62,24 +61,39 @@ public class ScheduledTasks {
/**
* Constructor for tasks registration in PRHWorkflow.
*
- * @param dmaapConsumerTask - fist task
- * @param dmaapPublisherTask - third task
- * @param aaiPublisherTask - second task
+ * @param dmaapConsumerTask - fist task
+ * @param dmaapReadyPublisherTask - third task
+ * @param dmaapUpdatePublisherTask - fourth task
+ * @param aaiPublisherTask - second task
*/
@Autowired
public ScheduledTasks(
- DmaapConsumerTask dmaapConsumerTask,
- DmaapPublisherTask dmaapPublisherTask,
- AaiProducerTask aaiPublisherTask,
- BbsActionsTask bbsActionsTask,
- Map<String, String> mdcContextMap) {
+ final DmaapConsumerTask dmaapConsumerTask,
+ @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
+ @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
+ final AaiQueryTask aaiQueryTask,
+ final AaiProducerTask aaiPublisherTask,
+ final BbsActionsTask bbsActionsTask,
+ final Map<String, String> mdcContextMap) {
this.dmaapConsumerTask = dmaapConsumerTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
+ this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
+ this.aaiQueryTask = aaiQueryTask;
this.aaiProducerTask = aaiPublisherTask;
this.bbsActionsTask = bbsActionsTask;
this.mdcContextMap = mdcContextMap;
}
+ static class State {
+ public final ConsumerDmaapModel DmaapModel;
+ public final Boolean ActivationStatus;
+
+ public State(final ConsumerDmaapModel DmaapModel, final Boolean ActivationStatus) {
+ this.DmaapModel = DmaapModel;
+ this.ActivationStatus = ActivationStatus;
+ }
+ }
+
/**
* Main function for scheduling prhWorkflow.
*/
@@ -89,22 +103,23 @@ public class ScheduledTasks {
logger.trace("Execution of tasks was registered");
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
consumeFromDMaaPMessage()
- .doOnError(DmaapEmptyResponseException.class, error ->
- logger.warn("Nothing to consume from DMaaP")
- )
- .flatMap(this::publishToAaiConfiguration)
- .doOnError(exception ->
- logger.warn("AAIProducerTask exception has been registered: ", exception))
- .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
- .flatMap(this::processAdditionalFields)
- .doOnError(exception ->
- logger.warn("BBSActionsTask exception has been registered: ", exception))
- .flatMap(this::publishToDmaapConfigurationWithApache)
- .doOnError(exception ->
- logger.warn("DMaaPProducerTask exception has been registered: ", exception))
- .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
- .doOnTerminate(mainCountDownLatch::countDown)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ logger.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::queryAaiForConfiguration)
+ .flatMap(this::publishToAaiConfiguration)
+ .doOnError(exception ->
+ logger.warn("AAIProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
+ .flatMap(this::processAdditionalFields)
+ .doOnError(exception ->
+ logger.warn("BBSActionsTask exception has been registered: ", exception))
+ .flatMap(this::publishToDmaapConfigurationWithApache)
+ .doOnError(exception ->
+ logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
mainCountDownLatch.await();
} catch (InterruptedException e) {
@@ -119,13 +134,13 @@ public class ScheduledTasks {
/**
* Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- * */
+ */
@Deprecated
private void onSuccess(HttpClientResponse response) {
String statusCode = Integer.toString(response.status().code());
MDC.put(RESPONSE_CODE, statusCode);
logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
+ statusCode);
MDC.remove(RESPONSE_CODE);
}
@@ -133,12 +148,11 @@ public class ScheduledTasks {
String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
MDC.put(RESPONSE_CODE, statusCode);
logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
+ statusCode);
MDC.remove(RESPONSE_CODE);
}
-
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
@@ -163,40 +177,66 @@ public class ScheduledTasks {
}
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
+ private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
+ return aaiQueryTask
+ .execute(monoDMaaPModel)
+ .map(x -> new State(monoDMaaPModel, x));
+ }
+
+ private Mono<State> publishToAaiConfiguration(final State state) {
try {
- return aaiProducerTask.execute(monoDMaaPModel);
+ return state.ActivationStatus
+ ? Mono.just(state)
+ : aaiProducerTask
+ .execute(state.DmaapModel)
+ .map(x -> state);
} catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<ConsumerDmaapModel> processAdditionalFields(ConsumerDmaapModel consumerDmaapModel) {
- return bbsActionsTask.execute(consumerDmaapModel);
+ private Mono<State> processAdditionalFields(final State state) {
+ if (state.ActivationStatus) {
+ logger.debug("Re-registration - Logical links won't be updated.");
+
+ return Mono.just(state);
+ }
+
+ return bbsActionsTask.execute(state.DmaapModel).map(x -> state);
}
/**
* Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- * */
+ */
@Deprecated
- private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
+ private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
+ publishToDmaapConfiguration(final State state) {
try {
- return dmaapProducerTask.execute(monoAaiModel);
+ if (state.ActivationStatus) {
+ logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+ return dmaapUpdateProducerTask.execute(state.DmaapModel);
+ }
+
+ return dmaapReadyProducerTask.execute(state.DmaapModel);
} catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) {
+ private Mono<org.apache.http.HttpResponse>
+ publishToDmaapConfigurationWithApache(final State state) {
try {
- return dmaapProducerTask.executeWithApache(monoAaiModel);
+ if (state.ActivationStatus) {
+ logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+ return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel);
+ }
+
+ return dmaapReadyProducerTask.executeWithApache(state.DmaapModel);
} catch (Exception e) {
return Mono.error(e);
}
}
-
-
private Predicate<Throwable> resumePrhPredicate() {
return exception -> exception instanceof PrhTaskException;
}