From c8c9a242f7a1f8454e2cf94b0442128533569dc5 Mon Sep 17 00:00:00 2001 From: wasala Date: Tue, 26 Jun 2018 19:29:43 +0200 Subject: DmaapConsumerReactive fixed tests Change-Id: I888ef94a084f32a18c77c12a18fb6636a4f33649 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala --- .../prh/service/DmaapConsumerJsonParser.java | 21 +++++------ .../services/prh/tasks/DmaapConsumerTask.java | 5 +-- .../services/prh/tasks/DmaapConsumerTaskImpl.java | 9 +++-- .../services/prh/tasks/ScheduledTasks.java | 7 ++-- .../org/onap/dcaegen2/services/prh/tasks/Task.java | 42 ++++++++++++++++++++++ 5 files changed, 64 insertions(+), 20 deletions(-) create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java (limited to 'prh-app-server/src/main/java') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 20ec78fc..22acf547 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -30,7 +30,6 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; /** @@ -47,20 +46,21 @@ public class DmaapConsumerJsonParser { private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber"; - public Mono getJsonObject(Mono monoMessage) { + public Mono> getJsonObject(Mono> monoMessage) { return monoMessage.flatMap(message -> { - if (!StringUtils.isEmpty(message)) { - JsonElement jsonElement = new JsonParser().parse(message); - ConsumerDmaapModel consumerDmaapModel; + if (message.isPresent()) { + JsonElement jsonElement = new JsonParser().parse(message.orElse("")); + Optional consumerDmaapModel; try { if (jsonElement.isJsonObject()) { - consumerDmaapModel = create(jsonElement.getAsJsonObject()); + consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject())); } else { - consumerDmaapModel = create( - StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new)); + consumerDmaapModel = Optional + .of(create( + StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() + .flatMap(this::getJsonObjectFromAnArray) + .orElseThrow(DmaapEmptyResponseException::new))); } logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel); return Mono.just(consumerDmaapModel); @@ -112,4 +112,5 @@ public class DmaapConsumerJsonParser { private boolean containsHeader(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS); } + } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index d238b34c..753d1f9c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -19,6 +19,7 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.util.Optional; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient; @@ -29,11 +30,11 @@ import reactor.core.publisher.Mono; */ abstract class DmaapConsumerTask { - abstract Mono consume(Mono message) throws PrhTaskException; + abstract Mono> consume(Mono> message) throws PrhTaskException; abstract DmaapConsumerReactiveHttpClient resolveClient(); abstract void initConfigs(); - protected abstract Mono execute(String object) throws PrhTaskException; + protected abstract Mono> execute(String object) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 564a7a41..3181c069 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; @@ -41,6 +40,7 @@ import reactor.core.publisher.Mono; @Component public class DmaapConsumerTaskImpl extends DmaapConsumerTask { + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final Config prhAppConfig; private DmaapConsumerJsonParser dmaapConsumerJsonParser; @@ -59,18 +59,17 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override - Mono consume(Mono message) { + Mono> consume(Mono> message) throws PrhTaskException { logger.info("Consumed model from DmaaP: {}", message); return dmaapConsumerJsonParser.getJsonObject(message); } - @Override - public Mono execute(String object) { + public Mono> execute(String object) throws PrhTaskException { dmaapConsumerReactiveHttpClient = resolveClient(); // dmaapConsumerReactiveHttpClient.initWebClient(); logger.trace("Method called with arg {}", object); - return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne())); + return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse())); } @Override diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 37b8686e..6fa986e4 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -19,6 +19,7 @@ */ package org.onap.dcaegen2.services.prh.tasks; +import java.util.Optional; import java.util.concurrent.Callable; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; @@ -76,7 +77,7 @@ public class ScheduledTasks { } } - private Callable> consumeFromDMaaPMessage() { + private Callable>> consumeFromDMaaPMessage() { return () -> { dmaapConsumerTask.initConfigs(); @@ -84,10 +85,10 @@ public class ScheduledTasks { }; } - private Mono publishToAAIConfiguration(Mono monoDMaaPModel) { + private Mono publishToAAIConfiguration(Mono> monoDMaaPModel) { return monoDMaaPModel.flatMap(dmaapModel -> { try { - return Mono.just(aaiProducerTask.execute(dmaapModel)); + return Mono.just(aaiProducerTask.execute(dmaapModel.get())); } catch (PrhTaskException e) { logger.warn("Exception in A&AIProducer task ", e); return Mono.error(e); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java new file mode 100644 index 00000000..c26028a7 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; + +/** + * @author Przemysław Wąsala on 4/13/18 + */ + + +public abstract class Task { + + Task taskProcess; + + protected abstract void receiveRequest(R body) throws PrhTaskException; + + protected abstract S execute(R object) throws PrhTaskException; + + protected abstract C resolveConfiguration(); + + void setNext(Task task) { + this.taskProcess = task; + } +} -- cgit 1.2.3-korg