summaryrefslogtreecommitdiffstats
path: root/prh-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java60
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java32
3 files changed, 90 insertions, 11 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index ec8ffaff..e2a91f78 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -26,13 +26,22 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import reactor.netty.http.client.HttpClientResponse;
import reactor.core.publisher.Mono;
+import org.apache.http.HttpResponse;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
interface DmaapPublisherTask {
+ /**
+ *
+ * Does not work reactive version with DMaaP MR - to be investigated why in future
+ * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ * */
+ @Deprecated
Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
+ Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+
DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index c25528bd..1a9abf0f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -22,22 +22,24 @@ package org.onap.dcaegen2.services.prh.tasks;
import java.util.Optional;
import javax.net.ssl.SSLException;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-
+import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import reactor.netty.http.client.HttpClientResponse;
import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClientResponse;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -46,13 +48,14 @@ import reactor.core.publisher.Mono;
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+ private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
private Config config;
-
private final PublisherReactiveHttpClientFactory httpClientFactory;
@Autowired
public DmaapPublisherTaskImpl(Config config) {
- this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new PnfReadyJsonBodyBuilderImpl()));
+ this(config,
+ new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), new PnfReadyJsonBodyBuilderImpl()));
}
DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
@@ -61,18 +64,55 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
}
@Override
- public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException {
+ public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel)
+ throws DmaapNotFoundException, SSLException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty());
+ return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
+ }
+
+
+ @Override
+ public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
+ return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+
}
+ /**
+ *
+ * Does not work reactive version with DMaaP MR - to be investigated why in future
+ * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+ * */
@Override
- public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{
- return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+ public Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
+ String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+ DefaultHttpClient httpClient = new DefaultHttpClient();
+ HttpPost postRequest = new HttpPost(getUrl());
+ try {
+ StringEntity input = new StringEntity(json);
+ input.setContentType(config.getDmaapPublisherConfiguration().dmaapContentType());
+ postRequest.setEntity(input);
+ HttpResponse response = httpClient.execute(postRequest);
+ return Mono.just(response);
+ } catch (Exception e) {
+ LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+ return Mono.error(e);
+ }
+ }
+ private String getUrl() {
+ return (new URIBuilder()).scheme(config.getDmaapPublisherConfiguration().dmaapProtocol())
+ .host(config.getDmaapPublisherConfiguration().dmaapHostName())
+ .port(config.getDmaapPublisherConfiguration().dmaapPortNumber()).path(this.createRequestPath()).build()
+ .toString();
}
+
+ private String createRequestPath() {
+ return "/" + config.getDmaapPublisherConfiguration().dmaapTopicName();
+ }
+
+
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 16a6f8c5..a7bf42d1 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import javax.net.ssl.SSLException;
+
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -42,6 +43,7 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClientResponse;
+import org.apache.http.HttpResponse;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -97,7 +99,7 @@ public class ScheduledTasks {
.flatMap(this::processAdditionalFields)
.doOnError(exception ->
logger.warn("BBSActionsTask exception has been registered: ", exception))
- .flatMap(this::publishToDmaapConfiguration)
+ .flatMap(this::publishToDmaapConfigurationWithApache)
.doOnError(exception ->
logger.warn("DMaaPProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
@@ -115,6 +117,10 @@ public class ScheduledTasks {
logger.info("PRH tasks have been completed");
}
+ /**
+ * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
+ * */
+ @Deprecated
private void onSuccess(HttpClientResponse response) {
String statusCode = Integer.toString(response.status().code());
MDC.put(RESPONSE_CODE, statusCode);
@@ -123,6 +129,16 @@ public class ScheduledTasks {
MDC.remove(RESPONSE_CODE);
}
+ private void onSuccess(HttpResponse response) {
+ String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
+ MDC.put(RESPONSE_CODE, statusCode);
+ logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ statusCode);
+ MDC.remove(RESPONSE_CODE);
+ }
+
+
+
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
@@ -159,6 +175,10 @@ public class ScheduledTasks {
return bbsActionsTask.execute(consumerDmaapModel);
}
+ /**
+ * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
+ * */
+ @Deprecated
private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
@@ -167,6 +187,16 @@ public class ScheduledTasks {
}
}
+ private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) {
+ try {
+ return dmaapProducerTask.executeWithApache(monoAaiModel);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+
+
private Predicate<Throwable> resumePrhPredicate() {
return exception -> exception instanceof PrhTaskException;
}