diff options
Diffstat (limited to 'prh-app-server')
3 files changed, 90 insertions, 11 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index ec8ffaff..e2a91f78 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -26,13 +26,22 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; +import org.apache.http.HttpResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ interface DmaapPublisherTask { + /** + * + * Does not work reactive version with DMaaP MR - to be investigated why in future + * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * */ + @Deprecated Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; + Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index c25528bd..1a9abf0f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -22,22 +22,24 @@ package org.onap.dcaegen2.services.prh.tasks; import java.util.Optional; import javax.net.ssl.SSLException; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; - +import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -46,13 +48,14 @@ import reactor.core.publisher.Mono; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); private Config config; - private final PublisherReactiveHttpClientFactory httpClientFactory; @Autowired public DmaapPublisherTaskImpl(Config config) { - this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new PnfReadyJsonBodyBuilderImpl())); + this(config, + new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), new PnfReadyJsonBodyBuilderImpl())); } DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { @@ -61,18 +64,55 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { } @Override - public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException { + public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) + throws DmaapNotFoundException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty()); + return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); + } + + + @Override + public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { + return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + } + /** + * + * Does not work reactive version with DMaaP MR - to be investigated why in future + * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * */ @Override - public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{ - return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + public Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { + String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); + DefaultHttpClient httpClient = new DefaultHttpClient(); + HttpPost postRequest = new HttpPost(getUrl()); + try { + StringEntity input = new StringEntity(json); + input.setContentType(config.getDmaapPublisherConfiguration().dmaapContentType()); + postRequest.setEntity(input); + HttpResponse response = httpClient.execute(postRequest); + return Mono.just(response); + } catch (Exception e) { + LOGGER.warn("Publishing to DMaaP MR failed: {}", e); + return Mono.error(e); + } + } + private String getUrl() { + return (new URIBuilder()).scheme(config.getDmaapPublisherConfiguration().dmaapProtocol()) + .host(config.getDmaapPublisherConfiguration().dmaapHostName()) + .port(config.getDmaapPublisherConfiguration().dmaapPortNumber()).path(this.createRequestPath()).build() + .toString(); } + + private String createRequestPath() { + return "/" + config.getDmaapPublisherConfiguration().dmaapTopicName(); + } + + }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 16a6f8c5..a7bf42d1 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import javax.net.ssl.SSLException; + import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -42,6 +43,7 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClientResponse; +import org.apache.http.HttpResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -97,7 +99,7 @@ public class ScheduledTasks { .flatMap(this::processAdditionalFields) .doOnError(exception -> logger.warn("BBSActionsTask exception has been registered: ", exception)) - .flatMap(this::publishToDmaapConfiguration) + .flatMap(this::publishToDmaapConfigurationWithApache) .doOnError(exception -> logger.warn("DMaaPProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) @@ -115,6 +117,10 @@ public class ScheduledTasks { logger.info("PRH tasks have been completed"); } + /** + * Marked as deprecated due to problems with DMaaP MR, to be fixed in future + * */ + @Deprecated private void onSuccess(HttpClientResponse response) { String statusCode = Integer.toString(response.status().code()); MDC.put(RESPONSE_CODE, statusCode); @@ -123,6 +129,16 @@ public class ScheduledTasks { MDC.remove(RESPONSE_CODE); } + private void onSuccess(HttpResponse response) { + String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); + MDC.put(RESPONSE_CODE, statusCode); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + statusCode); + MDC.remove(RESPONSE_CODE); + } + + + private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); @@ -159,6 +175,10 @@ public class ScheduledTasks { return bbsActionsTask.execute(consumerDmaapModel); } + /** + * Marked as deprecated due to problems with DMaaP MR, to be fixed in future + * */ + @Deprecated private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); @@ -167,6 +187,16 @@ public class ScheduledTasks { } } + private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) { + try { + return dmaapProducerTask.executeWithApache(monoAaiModel); + } catch (Exception e) { + return Mono.error(e); + } + } + + + private Predicate<Throwable> resumePrhPredicate() { return exception -> exception instanceof PrhTaskException; } |