aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
1 files changed, 14 insertions, 41 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 4b3436e5..8aad3eed 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -20,19 +20,22 @@
package org.onap.dcaegen2.services.prh.tasks;
-import org.apache.http.HttpResponse;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClientResponse;
import javax.net.ssl.SSLException;
import java.util.Map;
@@ -116,7 +119,7 @@ public class ScheduledTasks {
.flatMap(this::processAdditionalFields)
.doOnError(exception ->
LOGGER.warn("BBSActionsTask exception has been registered: ", exception))
- .flatMap(this::publishToDmaapConfigurationWithApache)
+ .flatMap(this::publishToDmaapConfiguration)
.doOnError(exception ->
LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
@@ -134,27 +137,15 @@ public class ScheduledTasks {
LOGGER.info("PRH tasks have been completed");
}
- /**
- * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- */
- @Deprecated
- private void onSuccess(HttpClientResponse response) {
- String statusCode = Integer.toString(response.status().code());
- MDC.put(RESPONSE_CODE, statusCode);
- LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
- MDC.remove(RESPONSE_CODE);
- }
-
- private void onSuccess(HttpResponse response) {
- String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
- MDC.put(RESPONSE_CODE, statusCode);
- LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ String statusCodeOk = HttpStatus.OK.name();
+ MDC.put(RESPONSE_CODE, statusCodeOk);
+ LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
+ MDC.remove(RESPONSE_CODE);
+ }
}
-
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
@@ -206,10 +197,6 @@ public class ScheduledTasks {
return bbsActionsTask.execute(state.DmaapModel).map(x -> state);
}
- /**
- * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- */
- @Deprecated
private Flux<MessageRouterPublishResponse>
publishToDmaapConfiguration(final State state) {
try {
@@ -224,20 +211,6 @@ public class ScheduledTasks {
}
}
- private Mono<org.apache.http.HttpResponse>
- publishToDmaapConfigurationWithApache(final State state) {
- try {
- if (state.ActivationStatus) {
- LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
- return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel);
- }
-
- return dmaapReadyProducerTask.executeWithApache(state.DmaapModel);
- } catch (Exception e) {
- return Mono.error(e);
- }
- }
-
private Predicate<Throwable> resumePrhPredicate() {
return exception -> exception instanceof PrhTaskException;
}