summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-26 15:15:03 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-07 08:21:02 +0200
commit8b1502fb0f1af5d00ec26e712e57b792fbd16bd8 (patch)
tree7cc80c278f17710863e6d865df77c5edfa6d4fbc /prh-app-server/src/main/java
parente17c2d89d0470501fa60ed487726b0bbf3305f8c (diff)
Added dmaapReactiveConsumer
*Tests have not been ready yet Change-Id: I2e1d9c4218f91ae2f066b28acdbaa1870d7d27e7 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java40
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java30
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java36
4 files changed, 66 insertions, 49 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index ee42ce4a..20ec78fc 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -26,11 +26,12 @@ import java.util.Optional;
import java.util.stream.StreamSupport;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -46,19 +47,29 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
- JsonElement jsonElement = new JsonParser().parse(message);
- Optional<ConsumerDmaapModel> consumerDmaapModel;
- if (jsonElement.isJsonObject()) {
- consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
- } else {
- consumerDmaapModel = Optional
- .of(create(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
- }
- logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
- return consumerDmaapModel;
+ public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ return monoMessage.flatMap(message ->
+ {
+ if (!StringUtils.isEmpty(message)) {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel;
+ try {
+ if (jsonElement.isJsonObject()) {
+ consumerDmaapModel = create(jsonElement.getAsJsonObject());
+ } else {
+ consumerDmaapModel = create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
+ }
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return Mono.just(consumerDmaapModel);
+ } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+ return Mono.error(e);
+ }
+ }
+ return Mono.error(new DmaapEmptyResponseException());
+ });
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -101,5 +112,4 @@ public class DmaapConsumerJsonParser {
private boolean containsHeader(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
}
-
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 1be3b28d..d238b34c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -21,18 +21,19 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
abstract class DmaapConsumerTask {
- abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
+ abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
- abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
+ abstract DmaapConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
- protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException;
+ protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 3944d416..564a7a41 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -23,15 +23,17 @@ import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -41,8 +43,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
- private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
@Autowired
public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
@@ -57,18 +59,18 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
- ConsumerDmaapModel consume(String message) throws PrhTaskException {
- logger.info("Consumed model from DMaaP: {}", message);
- return dmaapConsumerJsonParser.getJsonObject(message)
- .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
+ Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ logger.info("Consumed model from DmaaP: {}", message);
+ return dmaapConsumerJsonParser.getJsonObject(message);
}
+
@Override
- public ConsumerDmaapModel execute(String object) throws PrhTaskException {
- extendedDmaapConsumerHttpClient = resolveClient();
+ public Mono<ConsumerDmaapModel> execute(String object) {
+ dmaapConsumerReactiveHttpClient = resolveClient();
+// dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
- return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
- new PrhTaskException("DMaaPConsumerTask has returned null"))));
+ return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
}
@Override
@@ -81,8 +83,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
@Override
- ExtendedDmaapConsumerHttpClientImpl resolveClient() {
- return Optional.ofNullable(extendedDmaapConsumerHttpClient)
- .orElseGet(() -> new ExtendedDmaapConsumerHttpClientImpl(resolveConfiguration()));
+ DmaapConsumerReactiveHttpClient resolveClient() {
+ return Optional.ofNullable(dmaapConsumerReactiveHttpClient)
+ .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration()));
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index cf096b7b..37b8686e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -55,7 +55,7 @@ public class ScheduledTasks {
Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
.doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .flatMap(this::publishToAAIConfiguration)
+ .map(this::publishToAAIConfiguration)
.flatMap(this::publishToDMaaPConfiguration)
.subscribeOn(Schedulers.elastic());
@@ -76,7 +76,7 @@ public class ScheduledTasks {
}
}
- private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -84,21 +84,25 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
- try {
- return Mono.just(aaiProducerTask.execute(dmaapModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in A&AIProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ return monoDMaaPModel.flatMap(dmaapModel -> {
+ try {
+ return Mono.just(aaiProducerTask.execute(dmaapModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in A&AIProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
- private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
- try {
- return Mono.just(dmaapProducerTask.execute(aaiModel));
- } catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
- return Mono.error(e);
- }
+ private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+ return monoAAIModel.flatMap(aaiModel -> {
+ try {
+ return Mono.just(dmaapProducerTask.execute(aaiModel));
+ } catch (PrhTaskException e) {
+ logger.warn("Exception in DMaaPProducer task ", e);
+ return Mono.error(e);
+ }
+ });
}
}