diff options
Diffstat (limited to 'prh-app-server/src/main')
4 files changed, 18 insertions, 80 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index 4bb5a31c..e0dcf0b3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -52,13 +52,13 @@ public class AaiProducerTaskImpl extends AaiProducerTask { @Override Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { - Mono<HttpResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); - return resposne.flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response.statusCode())) { + Mono<HttpResponse> response = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); + return response.flatMap(r -> { + if (HttpUtils.isSuccessfulResponseCode(r.statusCode())) { return Mono.just(consumerDmaapModel); } return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.statusCode())); + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + r.statusCode())); }); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index f63f4d76..d1a42c4d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -24,15 +24,10 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ public interface DmaapPublisherTask { - - Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - - Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 3a724884..9cec7779 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -21,10 +21,6 @@ package org.onap.dcaegen2.services.prh.tasks; import com.google.gson.JsonPrimitive; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; @@ -34,7 +30,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import java.util.function.Supplier; @@ -67,29 +62,4 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { config.get(), Flux.just(json).map(JsonPrimitive::new)); } - - /** - * - * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); - * */ - @Override - public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { - String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); - try (DefaultHttpClient httpClient = new DefaultHttpClient()) { - HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl()); - try { - StringEntity input = new StringEntity(json); - input.setContentType(config.get().contentType()); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - return Mono.just(response); - } catch (Exception e) { - LOGGER.warn("Publishing to DMaaP MR failed: {}", e); - return Mono.error(e); - } - } - } - - }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 4b3436e5..8aad3eed 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -20,19 +20,22 @@ package org.onap.dcaegen2.services.prh.tasks; -import org.apache.http.HttpResponse; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; -import org.slf4j.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClientResponse; import javax.net.ssl.SSLException; import java.util.Map; @@ -116,7 +119,7 @@ public class ScheduledTasks { .flatMap(this::processAdditionalFields) .doOnError(exception -> LOGGER.warn("BBSActionsTask exception has been registered: ", exception)) - .flatMap(this::publishToDmaapConfigurationWithApache) + .flatMap(this::publishToDmaapConfiguration) .doOnError(exception -> LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) @@ -134,27 +137,15 @@ public class ScheduledTasks { LOGGER.info("PRH tasks have been completed"); } - /** - * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - */ - @Deprecated - private void onSuccess(HttpClientResponse response) { - String statusCode = Integer.toString(response.status().code()); - MDC.put(RESPONSE_CODE, statusCode); - LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); - MDC.remove(RESPONSE_CODE); - } - - private void onSuccess(HttpResponse response) { - String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); - MDC.put(RESPONSE_CODE, statusCode); - LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + String statusCodeOk = HttpStatus.OK.name(); + MDC.put(RESPONSE_CODE, statusCodeOk); + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk); + MDC.remove(RESPONSE_CODE); + } } - private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); @@ -206,10 +197,6 @@ public class ScheduledTasks { return bbsActionsTask.execute(state.DmaapModel).map(x -> state); } - /** - * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - */ - @Deprecated private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) { try { @@ -224,20 +211,6 @@ public class ScheduledTasks { } } - private Mono<org.apache.http.HttpResponse> - publishToDmaapConfigurationWithApache(final State state) { - try { - if (state.ActivationStatus) { - LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); - return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel); - } - - return dmaapReadyProducerTask.executeWithApache(state.DmaapModel); - } catch (Exception e) { - return Mono.error(e); - } - } - private Predicate<Throwable> resumePrhPredicate() { return exception -> exception instanceof PrhTaskException; } |