diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java | 78 |
1 files changed, 29 insertions, 49 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 55a8bb58..3a724884 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.prh.tasks; +import com.google.gson.JsonPrimitive; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.net.ssl.SSLException; -import java.util.Optional; import java.util.function.Supplier; /** @@ -46,70 +44,52 @@ import java.util.function.Supplier; public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Supplier<DmaapPublisherConfiguration> config; + + private final Supplier<MessageRouterPublishRequest> config; + private final MessageRouterPublisherResolver messageRouterPublisherClientResolver; private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); - private final PublisherReactiveHttpClientFactory httpClientFactory; - public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) { - this(config, new PublisherReactiveHttpClientFactory( - new DmaaPRestTemplateFactory(), - new PnfReadyJsonBodyBuilderImpl())); - } - DmaapPublisherTaskImpl( - Supplier<DmaapPublisherConfiguration> config, - PublisherReactiveHttpClientFactory httpClientFactory) { + public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) { this.config = config; - this.httpClientFactory = httpClientFactory; + this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver; } @Override - public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> - execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException { + public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); + MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); - } - - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.get()); - + String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); + return messageRouterPublisher.put( + config.get(), + Flux.just(json).map(JsonPrimitive::new)); } /** * * Does not work reactive version with DMaaP MR - to be investigated why in future - * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Override public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); - DefaultHttpClient httpClient = new DefaultHttpClient(); - HttpPost postRequest = new HttpPost(getUrl()); - try { - StringEntity input = new StringEntity(json); - input.setContentType(config.get().dmaapContentType()); - postRequest.setEntity(input); - HttpResponse response = httpClient.execute(postRequest); - return Mono.just(response); - } catch (Exception e) { - LOGGER.warn("Publishing to DMaaP MR failed: {}", e); - return Mono.error(e); + try (DefaultHttpClient httpClient = new DefaultHttpClient()) { + HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl()); + try { + StringEntity input = new StringEntity(json); + input.setContentType(config.get().contentType()); + postRequest.setEntity(input); + HttpResponse response = httpClient.execute(postRequest); + return Mono.just(response); + } catch (Exception e) { + LOGGER.warn("Publishing to DMaaP MR failed: {}", e); + return Mono.error(e); + } } } - private String getUrl() { - return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol()) - .host(config.get().dmaapHostName()) - .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build() - .toString(); - } - private String createRequestPath() { - return "/" + config.get().dmaapTopicName(); - } }
\ No newline at end of file |