aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java78
1 files changed, 29 insertions, 49 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 55a8bb58..3a724884 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.tasks;
+import com.google.gson.JsonPrimitive;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import javax.net.ssl.SSLException;
-import java.util.Optional;
import java.util.function.Supplier;
/**
@@ -46,70 +44,52 @@ import java.util.function.Supplier;
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Supplier<DmaapPublisherConfiguration> config;
+
+ private final Supplier<MessageRouterPublishRequest> config;
+ private final MessageRouterPublisherResolver messageRouterPublisherClientResolver;
private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) {
- this(config, new PublisherReactiveHttpClientFactory(
- new DmaaPRestTemplateFactory(),
- new PnfReadyJsonBodyBuilderImpl()));
- }
- DmaapPublisherTaskImpl(
- Supplier<DmaapPublisherConfiguration> config,
- PublisherReactiveHttpClientFactory httpClientFactory) {
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) {
this.config = config;
- this.httpClientFactory = httpClientFactory;
+ this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver;
}
@Override
- public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
- execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException {
+ public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
+ MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
- }
-
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.get());
-
+ String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+ return messageRouterPublisher.put(
+ config.get(),
+ Flux.just(json).map(JsonPrimitive::new));
}
/**
*
* Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
* */
@Override
public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
- DefaultHttpClient httpClient = new DefaultHttpClient();
- HttpPost postRequest = new HttpPost(getUrl());
- try {
- StringEntity input = new StringEntity(json);
- input.setContentType(config.get().dmaapContentType());
- postRequest.setEntity(input);
- HttpResponse response = httpClient.execute(postRequest);
- return Mono.just(response);
- } catch (Exception e) {
- LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
- return Mono.error(e);
+ try (DefaultHttpClient httpClient = new DefaultHttpClient()) {
+ HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl());
+ try {
+ StringEntity input = new StringEntity(json);
+ input.setContentType(config.get().contentType());
+ postRequest.setEntity(input);
+ HttpResponse response = httpClient.execute(postRequest);
+ return Mono.just(response);
+ } catch (Exception e) {
+ LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+ return Mono.error(e);
+ }
}
}
- private String getUrl() {
- return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol())
- .host(config.get().dmaapHostName())
- .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build()
- .toString();
- }
- private String createRequestPath() {
- return "/" + config.get().dmaapTopicName();
- }
} \ No newline at end of file