summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLusheng Ji <lji@research.att.com>2018-08-21 15:10:01 +0000
committerGerrit Code Review <gerrit@onap.org>2018-08-21 15:10:01 +0000
commit14df4b094cc2701b10d5b93274e5d890e1967b53 (patch)
tree81bdeec2f7668d146f482c7f10cd292163f743f6
parentf35d60c7093dcb3493e1d5eaf759eecce45069ff (diff)
parent76d0e26cf99bf932b9e1aac3f7a6bb231d8bffe7 (diff)
Merge "Second part loading PRH CONF"
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java1
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java83
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java49
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java15
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java99
-rw-r--r--prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java2
8 files changed, 231 insertions, 27 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index 5fe7c60e..2357e1d2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -27,7 +27,6 @@ import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
index d83c8138..c5c77ec2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
@@ -21,7 +21,6 @@
package org.onap.dcaegen2.services.prh.configuration;
import java.util.Optional;
-
import java.util.function.Predicate;
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
@@ -32,7 +31,6 @@ import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguratio
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
/**
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
new file mode 100644
index 00000000..808c4a54
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableAaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguration;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18
+ */
+class CloudConfigParser {
+
+ private final JsonObject jsonObject;
+
+ CloudConfigParser(JsonObject jsonObject) {
+ this.jsonObject = jsonObject;
+ }
+
+ DmaapPublisherConfiguration getDmaapPublisherConfig() {
+ return new ImmutableDmaapPublisherConfiguration.Builder()
+ .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
+ .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
+ .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
+ .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
+ .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+ .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
+ .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
+ .build();
+ }
+
+ AaiClientConfiguration getAaiClientConfig() {
+ return new ImmutableAaiClientConfiguration.Builder()
+ .aaiHost(jsonObject.get("aai.aaiClientConfiguration.aaiHost").getAsString())
+ .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt())
+ .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString())
+ .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString())
+ .aaiIgnoreSslCertificateErrors(
+ jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean())
+ .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString())
+ .aaiProtocol(jsonObject.get("aai.aaiClientConfiguration.aaiProtocol").getAsString())
+ .aaiBasePath(jsonObject.get("aai.aaiClientConfiguration.aaiBasePath").getAsString())
+ .build();
+ }
+
+ DmaapConsumerConfiguration getDmaapConsumerConfig() {
+ return new ImmutableDmaapConsumerConfiguration.Builder()
+ .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
+ .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
+ .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
+ .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
+ .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
+ .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
+ .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
+ .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
+ .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
+ .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+ .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+ .build();
+ }
+} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index 5b5c038b..82017a9d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -20,7 +20,12 @@
package org.onap.dcaegen2.services.prh.configuration;
+import com.google.gson.JsonObject;
+import java.util.Optional;
import java.util.Properties;
+import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.onap.dcaegen2.services.prh.service.HttpClientExecutorService;
import org.slf4j.Logger;
@@ -46,6 +51,10 @@ public class CloudConfiguration extends AppConfig {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private HttpClientExecutorService httpClientExecutorService;
+ private AaiClientConfiguration aaiClientCloudConfiguration;
+ private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
+ private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
+
TaskScheduler cloudTaskScheduler;
@Value("#{systemEnvironment}")
@@ -62,17 +71,45 @@ public class CloudConfiguration extends AppConfig {
protected void runTask() {
Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
.subscribeOn(Schedulers.parallel())
- .subscribe(this::doOnSucces, this::doOnError);
+ .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+ }
+
+ private void parsingConfigError(Throwable throwable) {
+ logger.warn("Error in case of processing system environment, more details below: ", throwable);
}
- private void doOnError(Throwable throwable) {
- logger.warn("Error in case of processing system environment.%nMore details below:%n ", throwable);
+ private void cloudConfigError(Throwable throwable) {
+ logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
}
- private void doOnSucces(EnvProperties envProperties) {
+ private void parsingConfigSuccess(EnvProperties envProperties) {
logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
Flux.just(httpClientExecutorService.callConsulForConfigBindingServiceEndpoint(envProperties))
- .flatMap(configBindingServiceUri -> httpClientExecutorService.callConfigBindingServiceForPrhConfiguration(envProperties,
- configBindingServiceUri)).subscribe();
+ .flatMap(configBindingServiceUri -> httpClientExecutorService
+ .callConfigBindingServiceForPrhConfiguration(envProperties,
+ configBindingServiceUri)).subscribe(this::parseCloudConfig, this::cloudConfigError);
+ }
+
+ private void parseCloudConfig(JsonObject jsonObject) {
+ logger.info("Received application configuration: {}", jsonObject);
+ CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
+ dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
+ aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
+ dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
+ }
+
+ @Override
+ public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+ }
+
+ @Override
+ public AaiClientConfiguration getAaiClientConfiguration() {
+ return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration());
+ }
+
+ @Override
+ public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index d1905108..d3b6cbb3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -34,13 +34,14 @@ import reactor.core.publisher.Flux;
*/
class EnvironmentProcessor {
+ private static final int DEFAULT_CONSUL_PORT = 8500;
private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
private EnvironmentProcessor() {
}
static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
- logger.info("Loading configuration from system environment variables");
+ logger.info("Loading configuration from system environment variables {}", systemEnvironment);
EnvProperties envProperties;
try {
envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
@@ -78,8 +79,8 @@ class EnvironmentProcessor {
private static Integer getDefaultPortOfConsul() {
logger.warn("$CONSUL_PORT environment has not been defined");
- logger.warn("$CONSUL_PORT variable will be set to default port {}", 8500);
- return 8500;
+ logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ return DEFAULT_CONSUL_PORT;
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 1bce1c07..8782a180 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -26,7 +26,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,10 +33,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.support.PeriodicTrigger;
import reactor.core.publisher.Mono;
/**
@@ -49,7 +46,7 @@ public class SchedulerConfig extends CloudConfiguration {
private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000;
private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1;
- private static volatile List<ScheduledFuture> scheduledPrgTaskFutureList = new ArrayList<>();
+ private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
private final ConcurrentTaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
@@ -68,8 +65,8 @@ public class SchedulerConfig extends CloudConfiguration {
*/
@ApiOperation(value = "Get response on stopping task execution")
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
- scheduledPrgTaskFutureList.forEach(x -> x.cancel(false));
- scheduledPrgTaskFutureList.clear();
+ scheduledPrhTaskFutureList.forEach(x -> x.cancel(false));
+ scheduledPrhTaskFutureList.clear();
return Mono.defer(() ->
Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED))
);
@@ -84,11 +81,11 @@ public class SchedulerConfig extends CloudConfiguration {
@PostConstruct
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
- if (scheduledPrgTaskFutureList.isEmpty()) {
- scheduledPrgTaskFutureList.add(cloudTaskScheduler
+ if (scheduledPrhTaskFutureList.isEmpty()) {
+ scheduledPrhTaskFutureList.add(cloudTaskScheduler
.scheduleAtFixedRate(super::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
- scheduledPrgTaskFutureList.add(taskScheduler
+ scheduledPrhTaskFutureList.add(taskScheduler
.scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS));
return true;
} else {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
index 01081f4d..1b69f5fd 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
@@ -20,9 +20,19 @@
package org.onap.dcaegen2.services.prh.service;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -31,17 +41,96 @@ import org.springframework.stereotype.Service;
@Service
public class HttpClientExecutorService {
- public String callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
- return null;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ public Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
+
+ return HttpGetClient.callHttpGet(
+ envProperties.consulHost() + ":" + envProperties.consulPort() + "/v1/catalog/service/" + envProperties
+ .cbsName())
+ .flatMap(this::getJsonArrayFromRequest)
+ .flatMap(jsonArray -> Mono.just(jsonArray.get(0)))
+ .flatMap(this::createConfigBindingServiceURL);
+
+ }
+
+ public Publisher<JsonObject> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties,
+ Mono<String> configBindingServiceUri) {
+ return HttpGetClient.callHttpGet(configBindingServiceUri + "/service_component/" + envProperties.appName())
+ .flatMap(this::getJsonConfiguration);
+ }
+
+ private Mono<? extends JsonObject> getJsonConfiguration(String body) {
+ JsonElement jsonElement = new Gson().toJsonTree(body);
+ try {
+ return Mono.just(jsonElement.getAsJsonObject());
+ } catch (IllegalStateException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<String> createConfigBindingServiceURL(JsonElement jsonElement) {
+ JsonObject jsonObject;
+ try {
+ jsonObject = jsonElement.getAsJsonObject();
+ } catch (IllegalStateException e) {
+ return Mono.error(e);
+ }
+ return Mono.just(jsonObject.get("ServiceAddress").toString() + ":" + jsonObject.get("ServicePort").toString());
}
- public Publisher<String> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties,
- String configBindingServiceUri) {
- return null;
+ private Mono<? extends JsonArray> getJsonArrayFromRequest(String body) {
+ JsonElement jsonElement = new Gson().toJsonTree(body);
+ try {
+ return Mono.just(jsonElement.getAsJsonArray());
+ } catch (IllegalStateException e) {
+ logger.warn("Converting string to jsonArray threw error: " + e);
+ return Mono.error(e);
+ }
}
private static class HttpGetClient {
+ private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+
+ private static WebClient webClient;
+
+ private HttpGetClient() {
+ }
+
+ private static Mono<String> callHttpGet(String url) {
+ return webClient
+ .get()
+ .uri(url)
+ .retrieve()
+ .onStatus(HttpStatus::is4xxClientError, response ->
+ Mono.error(new Exception("Request for cloud config failed: HTTP 400")))
+ .onStatus(HttpStatus::is5xxServerError, response ->
+ Mono.error(new Exception("Request for cloud config failed: HTTP 500")))
+ .bodyToMono(String.class);
+ }
+
+ private static ExchangeFilterFunction logResponse() {
+ return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+ logger.info("Response status {}", clientResponse.statusCode());
+ return Mono.just(clientResponse);
+ });
+ }
+
+ private static ExchangeFilterFunction logRequest() {
+ return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+ logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+ clientRequest.headers()
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+ return Mono.just(clientRequest);
+ });
+ }
+
+ static {
+ webClient = WebClient.builder().filter(logRequest()).filter(logResponse()).build();
+ }
+
+
}
}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index 07388478..eb62d3cb 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -71,7 +71,7 @@ public class DMaaPProducerReactiveHttpClient {
.body(BodyInserters.fromObject(consumerDmaapModelMono))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
+ Mono.error(new Exception("HTTP 400"))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
Mono.error(new Exception("HTTP 500")))