aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2019-03-07 10:47:59 +0100
committerMarcin Migdal <marcin.migdal@nokia.com>2019-03-20 11:59:16 +0100
commit295746ec486e0300e7d5958ba44f8054c30389f4 (patch)
tree3b43650f5878d00af3193a82d8394b1e0fcac1c0 /prh-app-server/src/main/java/org
parentd824f1434f7ea71f1efdf3e9a667f6724ae600d3 (diff)
Integrate PRH with SDK in version 1.1.4
- AAI client aligned - CBS client aligned - DmaaP client aligned Change-Id: I7afd0a44572e1097be5c3a4acc7221c7923cea8b Issue-ID: DCAEGEN2-1319 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java23
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java37
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java15
10 files changed, 74 insertions, 64 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index 3967dc0a..5ea07e0a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -25,6 +25,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.REQUEST_
import java.util.Map;
import java.util.UUID;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient;
import org.slf4j.MDC;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -58,4 +59,10 @@ public class MainApp {
TaskScheduler concurrentTaskScheduler() {
return new ConcurrentTaskScheduler();
}
+
+
+ @Bean
+ CloudConfigurationClient getCloudConfigurationClient(){
+ return new CloudConfigurationClient();
+ }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index 08c99621..d2849500 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -23,7 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -60,7 +61,6 @@ public class CloudConfiguration extends AppConfig {
@Value("#{systemEnvironment}")
private Properties systemEnvironment;
-
@Autowired
public void setThreadPoolTaskScheduler(CloudConfigurationClient prhConfigurationProvider) {
this.prhConfigurationProvider = prhConfigurationProvider;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index 793fcc27..3d765bde 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -21,8 +21,8 @@
package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 00a6d465..a69b7c54 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.prh.service;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@@ -77,26 +78,14 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonArray> monoMessage) {
return monoMessage
- .flatMapMany(this::getJsonParserMessage)
- .flatMap(this::createJsonConsumerModel);
+ .flatMapMany(this::getConsumerDmaapModelFromJsonArray);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty")
- : Mono.fromCallable(() -> new JsonParser().parse(message));
- }
-
- private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject()
- ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getConsumerDmaapModelFromJsonArray(jsonElement);
- }
-
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray jsonElement) {
return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.spliterator(), false)
.map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
.orElseGet(JsonObject::new)))));
}
@@ -115,7 +104,6 @@ public class DmaapConsumerJsonParser {
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
-
JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT)
@@ -123,7 +111,6 @@ public class DmaapConsumerJsonParser {
this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE);
-
this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS);
this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index 8e31807a..705b085b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiReactiveWebClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient;
+
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@@ -40,14 +42,14 @@ public abstract class AaiProducerTask {
abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
- abstract AaiReactiveHttpPatchClient resolveClient() throws SSLException;
+ abstract AaiHttpPatchClient resolveClient();
protected abstract AaiClientConfiguration resolveConfiguration();
protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
throws PrhTaskException, SSLException;
- WebClient buildWebClient() throws SSLException {
- return new AaiReactiveWebClientFactory(new SslFactory(), resolveConfiguration()).build();
+ CloudHttpClient buildHttpClient() {
+ return new AaiHttpClientFactory(resolveConfiguration()).build();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index 355ca905..a34549d8 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -20,22 +20,22 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl;
import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClientResponse;
/**
@@ -47,7 +47,7 @@ public class AaiProducerTaskImpl extends AaiProducerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
private final Config config;
- private AaiReactiveHttpPatchClient aaiReactiveHttpPatchClient;
+ private AaiHttpPatchClient aaiHttpPatchClient;
@Autowired
public AaiProducerTaskImpl(Config config) {
@@ -56,20 +56,20 @@ public class AaiProducerTaskImpl extends AaiProducerTask {
@Override
Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
- LOGGER.info("Publish to AAI DmaapModel");
- return aaiReactiveHttpPatchClient.getAaiProducerResponse(consumerDmaapModel)
- .flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
- return Mono.just(consumerDmaapModel);
- }
- return Mono
- .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
- });
+ Mono<HttpClientResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel);
+ return resposne.flatMap(response -> {
+ if (HttpUtils.isSuccessfulResponseCode(response.status().code())) {
+ return Mono.just(consumerDmaapModel);
+ }
+ return Mono
+ .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.status().code()));
+ });
}
@Override
- AaiReactiveHttpPatchClient resolveClient() throws SSLException {
- return new AaiReactiveHttpPatchClient(resolveConfiguration(), new AaiJsonBodyBuilderImpl()).createAaiWebClient(buildWebClient());
+ AaiHttpPatchClient resolveClient() {
+ return new AaiHttpPatchClient(resolveConfiguration(),
+ new AaiJsonBodyBuilderImpl()).createAaiHttpClient(new AaiHttpClientFactory(resolveConfiguration()).build());
}
@Override
@@ -78,12 +78,11 @@ public class AaiProducerTaskImpl extends AaiProducerTask {
}
@Override
- protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
- throws PrhTaskException, SSLException {
+ protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- aaiReactiveHttpPatchClient = resolveClient();
+ aaiHttpPatchClient = resolveClient();
LOGGER.debug("Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index fd7bca1e..69246090 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,6 +20,9 @@
package org.onap.dcaegen2.services.prh.tasks;
+import com.google.gson.JsonArray;
+import java.util.Objects;
+import java.util.Optional;
import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
@@ -69,7 +73,10 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
- return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
+
+ Mono<JsonArray> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(
+ Optional.empty());
+ return dmaapConsumerJsonParser.getJsonObject(response);
}
@Override
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 4d6c0f87..ec8ffaff 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -20,11 +20,11 @@
package org.onap.dcaegen2.services.prh.tasks;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-
-import org.springframework.http.ResponseEntity;
+import reactor.netty.http.client.HttpClientResponse;
import reactor.core.publisher.Mono;
/**
@@ -32,7 +32,7 @@ import reactor.core.publisher.Mono;
*/
interface DmaapPublisherTask {
- Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
- DMaaPPublisherReactiveHttpClient resolveClient();
+ DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 49accdd4..85b18b8a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,11 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.Optional;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
@@ -32,8 +35,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.produce
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.netty.http.client.HttpClientResponse;
import reactor.core.publisher.Mono;
/**
@@ -43,7 +46,8 @@ import reactor.core.publisher.Mono;
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Config config;
+ private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+
private final PublisherReactiveHttpClientFactory httpClientFactory;
@Autowired
@@ -52,22 +56,23 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
}
DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
- this.config = config;
+ this.dmaapPublisherConfiguration = config.getDmaapPublisherConfiguration();
this.httpClientFactory = httpClientFactory;
}
@Override
- public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty());
}
@Override
- public DMaaPPublisherReactiveHttpClient resolveClient() {
- return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+ public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{
+ return httpClientFactory.create(dmaapPublisherConfiguration);
+
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 2924225b..7ecf4a6e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -39,10 +39,10 @@ import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClientResponse;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -108,10 +108,13 @@ public class ScheduledTasks {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpClientResponse response) {
+
+ String statusCode = Integer.toString(response.status().code());
+
+ MDC.put(RESPONSE_CODE, statusCode);
logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- responseCode.getStatusCode().value());
+ statusCode);
MDC.remove(RESPONSE_CODE);
}
@@ -148,10 +151,10 @@ public class ScheduledTasks {
}
}
- private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
+ private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
- } catch (PrhTaskException e) {
+ } catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}