summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java28
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java16
4 files changed, 36 insertions, 18 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 1d121b38..aed99747 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -51,9 +52,9 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
- .flatMap(this::getJsonParserMessage)
+ .flatMapMany(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
@@ -62,27 +63,30 @@ public class DmaapConsumerJsonParser {
: Mono.fromCallable(() -> new JsonParser().parse(message));
}
- private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
return jsonElement.isJsonObject()
- ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
+ ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
: getConsumerDmaapModelFromJsonArray(jsonElement);
}
- private Mono<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
return create(
- Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new)));
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
- return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+ JsonParser jsonParser = new JsonParser();
+ return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+ : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
}
- private Mono<ConsumerDmaapModel> create(Mono<JsonObject> jsonObject) {
+ private Flux<ConsumerDmaapModel> create(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP ->
- !containsHeader(monoJsonP) ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
- : transform(monoJsonP));
+ !containsHeader(monoJsonP) ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+ : transform(monoJsonP))
+ .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty());
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index a6baf4a5..4cde2257 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -32,7 +33,7 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
+ abstract Flux<ConsumerDmaapModel> consume(Mono<String> message);
abstract DMaaPConsumerReactiveHttpClient resolveClient();
@@ -40,7 +41,7 @@ abstract class DmaapConsumerTask {
protected abstract DmaapConsumerConfiguration resolveConfiguration();
- protected abstract Mono<ConsumerDmaapModel> execute(String object);
+ protected abstract Flux<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
return new DMaaPReactiveWebClient().build();
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 341a229b..3a5f213c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -54,12 +55,12 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
}
@Override
- Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ Flux<ConsumerDmaapModel> consume(Mono<String> message) {
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
- public Mono<ConsumerDmaapModel> execute(String object) {
+ public Flux<ConsumerDmaapModel> execute(String object) {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index de7837ec..08767428 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -26,6 +26,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -39,6 +40,7 @@ import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@@ -83,7 +85,13 @@ public class ScheduledTasks {
logger.warn("Nothing to consume from DMaaP")
)
.flatMap(this::publishToAaiConfiguration)
+ .doOnError(exception ->
+ logger.warn("AAIProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.flatMap(this::publishToDmaapConfiguration)
+ .doOnError(exception ->
+ logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.doOnTerminate(mainCountDownLatch::countDown)
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -113,8 +121,8 @@ public class ScheduledTasks {
}
- private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
- return Mono.defer(() -> {
+ private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
logger.info(INVOKE, "Init configs");
@@ -138,4 +146,8 @@ public class ScheduledTasks {
return Mono.error(e);
}
}
+
+ private Predicate<Throwable> resumePrhPredicate() {
+ return exception -> exception instanceof PrhTaskException;
+ }
}