diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 16a6f8c5..a7bf42d1 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import javax.net.ssl.SSLException; + import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -42,6 +43,7 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClientResponse; +import org.apache.http.HttpResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -97,7 +99,7 @@ public class ScheduledTasks { .flatMap(this::processAdditionalFields) .doOnError(exception -> logger.warn("BBSActionsTask exception has been registered: ", exception)) - .flatMap(this::publishToDmaapConfiguration) + .flatMap(this::publishToDmaapConfigurationWithApache) .doOnError(exception -> logger.warn("DMaaPProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) @@ -115,6 +117,10 @@ public class ScheduledTasks { logger.info("PRH tasks have been completed"); } + /** + * Marked as deprecated due to problems with DMaaP MR, to be fixed in future + * */ + @Deprecated private void onSuccess(HttpClientResponse response) { String statusCode = Integer.toString(response.status().code()); MDC.put(RESPONSE_CODE, statusCode); @@ -123,6 +129,16 @@ public class ScheduledTasks { MDC.remove(RESPONSE_CODE); } + private void onSuccess(HttpResponse response) { + String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); + MDC.put(RESPONSE_CODE, statusCode); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + statusCode); + MDC.remove(RESPONSE_CODE); + } + + + private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); @@ -159,6 +175,10 @@ public class ScheduledTasks { return bbsActionsTask.execute(consumerDmaapModel); } + /** + * Marked as deprecated due to problems with DMaaP MR, to be fixed in future + * */ + @Deprecated private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); @@ -167,6 +187,16 @@ public class ScheduledTasks { } } + private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) { + try { + return dmaapProducerTask.executeWithApache(monoAaiModel); + } catch (Exception e) { + return Mono.error(e); + } + } + + + private Predicate<Throwable> resumePrhPredicate() { return exception -> exception instanceof PrhTaskException; } |