aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-26 19:29:43 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-07 08:32:28 +0200
commitc8c9a242f7a1f8454e2cf94b0442128533569dc5 (patch)
tree784673c75693f6f77b624e5c83c12e24b73646ec /prh-app-server/src/main/java/org
parent8b1502fb0f1af5d00ec26e712e57b792fbd16bd8 (diff)
DmaapConsumerReactive fixed tests
Change-Id: I888ef94a084f32a18c77c12a18fb6636a4f33649 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java21
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java7
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java42
5 files changed, 64 insertions, 20 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 20ec78fc..22acf547 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -30,7 +30,6 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
/**
@@ -47,20 +46,21 @@ public class DmaapConsumerJsonParser {
private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
- public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+ public Mono<Optional<ConsumerDmaapModel>> getJsonObject(Mono<Optional<String>> monoMessage) {
return monoMessage.flatMap(message ->
{
- if (!StringUtils.isEmpty(message)) {
- JsonElement jsonElement = new JsonParser().parse(message);
- ConsumerDmaapModel consumerDmaapModel;
+ if (message.isPresent()) {
+ JsonElement jsonElement = new JsonParser().parse(message.orElse(""));
+ Optional<ConsumerDmaapModel> consumerDmaapModel;
try {
if (jsonElement.isJsonObject()) {
- consumerDmaapModel = create(jsonElement.getAsJsonObject());
+ consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
} else {
- consumerDmaapModel = create(
- StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new));
+ consumerDmaapModel = Optional
+ .of(create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new)));
}
logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
return Mono.just(consumerDmaapModel);
@@ -112,4 +112,5 @@ public class DmaapConsumerJsonParser {
private boolean containsHeader(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
}
+
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index d238b34c..753d1f9c 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -19,6 +19,7 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.Optional;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
@@ -29,11 +30,11 @@ import reactor.core.publisher.Mono;
*/
abstract class DmaapConsumerTask {
- abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+ abstract Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException;
abstract DmaapConsumerReactiveHttpClient resolveClient();
abstract void initConfigs();
- protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+ protected abstract Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 564a7a41..3181c069 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
@@ -41,6 +40,7 @@ import reactor.core.publisher.Mono;
@Component
public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
+
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -59,18 +59,17 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
@Override
- Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+ Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
logger.info("Consumed model from DmaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
-
@Override
- public Mono<ConsumerDmaapModel> execute(String object) {
+ public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
dmaapConsumerReactiveHttpClient = resolveClient();
// dmaapConsumerReactiveHttpClient.initWebClient();
logger.trace("Method called with arg {}", object);
- return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
+ return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
}
@Override
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 37b8686e..6fa986e4 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -19,6 +19,7 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.util.Optional;
import java.util.concurrent.Callable;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -76,7 +77,7 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
+ private Callable<Mono<Optional<ConsumerDmaapModel>>> consumeFromDMaaPMessage() {
return () ->
{
dmaapConsumerTask.initConfigs();
@@ -84,10 +85,10 @@ public class ScheduledTasks {
};
}
- private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<Optional<ConsumerDmaapModel>> monoDMaaPModel) {
return monoDMaaPModel.flatMap(dmaapModel -> {
try {
- return Mono.just(aaiProducerTask.execute(dmaapModel));
+ return Mono.just(aaiProducerTask.execute(dmaapModel.get()));
} catch (PrhTaskException e) {
logger.warn("Exception in A&AIProducer task ", e);
return Mono.error(e);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
new file mode 100644
index 00000000..c26028a7
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
+ */
+
+
+public abstract class Task<R, S, C> {
+
+ Task taskProcess;
+
+ protected abstract void receiveRequest(R body) throws PrhTaskException;
+
+ protected abstract S execute(R object) throws PrhTaskException;
+
+ protected abstract C resolveConfiguration();
+
+ void setNext(Task task) {
+ this.taskProcess = task;
+ }
+}