summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java28
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java14
5 files changed, 50 insertions, 39 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 5cd30f8b..6db36a8a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.prh.tasks;
import java.util.Optional;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Mono;
/**
@@ -32,7 +32,7 @@ abstract class DmaapConsumerTask {
abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
- abstract DmaapConsumerReactiveHttpClient resolveClient();
+ abstract DMaaPConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 08008f0a..90382e51 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -23,11 +23,10 @@ import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,7 +42,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
@@ -58,16 +57,15 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
Mono<ConsumerDmaapModel> consume(Mono<String> message) {
- logger.info("Consumed model from DmaaP: {}", message);
+ logger.info("Consumed model from DMaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
public Mono<ConsumerDmaapModel> execute(String object) {
- dmaapConsumerReactiveHttpClient = resolveClient();
- dmaapConsumerReactiveHttpClient.initWebClient();
+ dMaaPConsumerReactiveHttpClient = resolveClient();
logger.trace("Method called with arg {}", object);
- return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
+ return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
}
@Override
@@ -80,8 +78,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
@Override
- DmaapConsumerReactiveHttpClient resolveClient() {
- return Optional.ofNullable(dmaapConsumerReactiveHttpClient)
- .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration()));
+ DMaaPConsumerReactiveHttpClient resolveClient() {
+
+ return Optional.ofNullable(dMaaPConsumerReactiveHttpClient)
+ .orElseGet(() -> {
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration();
+ return new DMaaPConsumerReactiveHttpClient(dmaapConsumerConfiguration).createDMaaPWebClient(
+ new DMaaPReactiveWebClient.WebClientBuilder()
+ .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType())
+ .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName())
+ .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build());
+ });
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 3520d134..27670e29 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -21,16 +21,17 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
abstract class DmaapPublisherTask {
- abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
- abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
+ abstract DMaaPProducerReactiveHttpClient resolveClient();
- protected abstract Integer execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 7cbeb3b3..faf43bc7 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -25,12 +25,13 @@ import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -40,7 +41,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
- private ExtendedDmaapProducerHttpClientImpl extendedDmaapProducerHttpClient;
+ private DMaaPProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
@Autowired
public DmaapPublisherTaskImpl(AppConfig prhAppConfig) {
@@ -48,19 +49,17 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
- logger.info("Publishing on DmaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
+ Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
+ logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
consumerDmaapModel);
- return extendedDmaapProducerHttpClient.getHttpProducerResponse(consumerDmaapModel)
- .filter(response -> response == HttpStatus.OK.value())
- .orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap"));
+ return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel).map(Integer::parseInt);
}
@Override
- public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
- .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to Dmaap task"));
- extendedDmaapProducerHttpClient = resolveClient();
+ .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task"));
+ dMaaPProducerReactiveHttpClient = resolveClient();
logger.trace("Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
@@ -70,8 +69,15 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
}
@Override
- ExtendedDmaapProducerHttpClientImpl resolveClient() {
- return Optional.ofNullable(extendedDmaapProducerHttpClient)
- .orElseGet(() -> new ExtendedDmaapProducerHttpClientImpl(resolveConfiguration()));
+ DMaaPProducerReactiveHttpClient resolveClient() {
+ return Optional.ofNullable(dMaaPProducerReactiveHttpClient)
+ .orElseGet(() -> {
+ DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration();
+ return new DMaaPProducerReactiveHttpClient(dmaapPublisherConfiguration).createDMaaPWebClient(
+ new DMaaPReactiveWebClient.WebClientBuilder()
+ .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType())
+ .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName())
+ .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build());
+ });
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index e161e3c5..2787e64b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -97,13 +97,11 @@ public class ScheduledTasks {
}
private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
- return monoAAIModel.flatMap(aaiModel -> {
- try {
- return Mono.just(dmaapProducerTask.execute(aaiModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
- return Mono.error(e);
- }
- });
+ try {
+ return dmaapProducerTask.execute(monoAAIModel);
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
}
}