summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java30
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksTest.java22
5 files changed, 30 insertions, 90 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index 4bb5a31c..e0dcf0b3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -52,13 +52,13 @@ public class AaiProducerTaskImpl extends AaiProducerTask {
@Override
Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
- Mono<HttpResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel);
- return resposne.flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response.statusCode())) {
+ Mono<HttpResponse> response = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel);
+ return response.flatMap(r -> {
+ if (HttpUtils.isSuccessfulResponseCode(r.statusCode())) {
return Mono.just(consumerDmaapModel);
}
return Mono
- .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.statusCode()));
+ .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + r.statusCode()));
});
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index f63f4d76..d1a42c4d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -24,15 +24,10 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
public interface DmaapPublisherTask {
-
-
Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
-
- Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 3a724884..9cec7779 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -21,10 +21,6 @@
package org.onap.dcaegen2.services.prh.tasks;
import com.google.gson.JsonPrimitive;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
@@ -34,7 +30,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import java.util.function.Supplier;
@@ -67,29 +62,4 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
config.get(),
Flux.just(json).map(JsonPrimitive::new));
}
-
- /**
- *
- * Does not work reactive version with DMaaP MR - to be investigated why in future
- * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
- * */
- @Override
- public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
- String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
- try (DefaultHttpClient httpClient = new DefaultHttpClient()) {
- HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl());
- try {
- StringEntity input = new StringEntity(json);
- input.setContentType(config.get().contentType());
- postRequest.setEntity(input);
- HttpResponse response = httpClient.execute(postRequest);
- return Mono.just(response);
- } catch (Exception e) {
- LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
- return Mono.error(e);
- }
- }
- }
-
-
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 4b3436e5..8aad3eed 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -20,19 +20,22 @@
package org.onap.dcaegen2.services.prh.tasks;
-import org.apache.http.HttpResponse;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClientResponse;
import javax.net.ssl.SSLException;
import java.util.Map;
@@ -116,7 +119,7 @@ public class ScheduledTasks {
.flatMap(this::processAdditionalFields)
.doOnError(exception ->
LOGGER.warn("BBSActionsTask exception has been registered: ", exception))
- .flatMap(this::publishToDmaapConfigurationWithApache)
+ .flatMap(this::publishToDmaapConfiguration)
.doOnError(exception ->
LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
@@ -134,27 +137,15 @@ public class ScheduledTasks {
LOGGER.info("PRH tasks have been completed");
}
- /**
- * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- */
- @Deprecated
- private void onSuccess(HttpClientResponse response) {
- String statusCode = Integer.toString(response.status().code());
- MDC.put(RESPONSE_CODE, statusCode);
- LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
- MDC.remove(RESPONSE_CODE);
- }
-
- private void onSuccess(HttpResponse response) {
- String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
- MDC.put(RESPONSE_CODE, statusCode);
- LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- statusCode);
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ String statusCodeOk = HttpStatus.OK.name();
+ MDC.put(RESPONSE_CODE, statusCodeOk);
+ LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
+ MDC.remove(RESPONSE_CODE);
+ }
}
-
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
@@ -206,10 +197,6 @@ public class ScheduledTasks {
return bbsActionsTask.execute(state.DmaapModel).map(x -> state);
}
- /**
- * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
- */
- @Deprecated
private Flux<MessageRouterPublishResponse>
publishToDmaapConfiguration(final State state) {
try {
@@ -224,20 +211,6 @@ public class ScheduledTasks {
}
}
- private Mono<org.apache.http.HttpResponse>
- publishToDmaapConfigurationWithApache(final State state) {
- try {
- if (state.ActivationStatus) {
- LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
- return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel);
- }
-
- return dmaapReadyProducerTask.executeWithApache(state.DmaapModel);
- } catch (Exception e) {
- return Mono.error(e);
- }
- }
-
private Predicate<Throwable> resumePrhPredicate() {
return exception -> exception instanceof PrhTaskException;
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksTest.java
index 9a3099c5..9acbadd7 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksTest.java
@@ -19,7 +19,6 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
-import org.apache.http.HttpResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -28,6 +27,7 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.integration.junit5.mockito.MockitoExtension;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -38,7 +38,9 @@ import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
public class ScheduledTasksTest {
@@ -151,20 +153,20 @@ public class ScheduledTasksTest {
verifyThatPnfModelWasSentDmaapPnfUpdateTopic();
}
- private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() {
- verify(readyPublisher, never()).executeWithApache(DMAAP_MODEL);
+ private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() throws PrhTaskException {
+ verify(readyPublisher, never()).execute(DMAAP_MODEL);
}
- private Mono<HttpResponse> verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() {
- return verify(updatePublisher, never()).executeWithApache(DMAAP_MODEL);
+ private Flux<MessageRouterPublishResponse> verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() throws PrhTaskException {
+ return verify(updatePublisher, never()).execute(DMAAP_MODEL);
}
- private void verifyThatPnfModelWasSentDmaapPnfReadyTopic() {
- verify(readyPublisher, atLeastOnce()).executeWithApache(DMAAP_MODEL);
+ private void verifyThatPnfModelWasSentDmaapPnfReadyTopic() throws PrhTaskException {
+ verify(readyPublisher, atLeastOnce()).execute(DMAAP_MODEL);
}
- private Mono<HttpResponse> verifyThatPnfModelWasSentDmaapPnfUpdateTopic() {
- return verify(updatePublisher, atLeastOnce()).executeWithApache(DMAAP_MODEL);
+ private Flux<MessageRouterPublishResponse> verifyThatPnfModelWasSentDmaapPnfUpdateTopic() throws PrhTaskException {
+ return verify(updatePublisher, atLeastOnce()).execute(DMAAP_MODEL);
}
private void verifyThatPnfUpdateWasNotSentToAai() throws PrhTaskException, SSLException {