diff options
Diffstat (limited to 'prh-app-server/src/main/java/org')
7 files changed, 41 insertions, 50 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 1d215c62..7516853e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -48,28 +48,30 @@ public class DmaapConsumerJsonParser { public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { - return monoMessage.flatMap(message -> - { - if (!StringUtils.isEmpty(message)) { - JsonElement jsonElement = new JsonParser().parse(message); - ConsumerDmaapModel consumerDmaapModel; - try { - if (jsonElement.isJsonObject()) { - consumerDmaapModel = create(jsonElement.getAsJsonObject()); - } else { - consumerDmaapModel = create( - StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() - .flatMap(this::getJsonObjectFromAnArray) - .orElseThrow(DmaapEmptyResponseException::new)); - } - logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel); - return Mono.just(consumerDmaapModel); - } catch (DmaapNotFoundException | DmaapEmptyResponseException e) { - return Mono.error(e); - } - } - return Mono.error(new DmaapEmptyResponseException()); - }); + return monoMessage + .flatMap(message -> (StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) + : convertJsonToConsumerDmaapModel(message))); + } + + private Mono<? extends ConsumerDmaapModel> convertJsonToConsumerDmaapModel(String message) { + try { + JsonElement jsonElement = new JsonParser().parse(message); + ConsumerDmaapModel consumerDmaapModel = jsonElement.isJsonObject() ? + create(jsonElement.getAsJsonObject()) : + getConsumerDmaapModelFromJsonArray(jsonElement); + logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel); + return Mono.just(consumerDmaapModel); + } catch (DmaapNotFoundException | DmaapEmptyResponseException e) { + return Mono.error(e); + } + } + + private ConsumerDmaapModel getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) + throws DmaapNotFoundException, DmaapEmptyResponseException { + return create( + StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst() + .flatMap(this::getJsonObjectFromAnArray) + .orElseThrow(DmaapEmptyResponseException::new)); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java index 005d08d1..b12fb5bb 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java @@ -26,8 +26,8 @@ import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; import org.onap.dcaegen2.services.prh.service.AAIProducerClient; -import org.onap.dcaegen2.services.prh.service.HttpUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index 8df564d2..93c287b4 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -43,10 +43,6 @@ abstract class DmaapConsumerTask { protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException; WebClient buildWebClient() { - DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration(); - return new DMaaPReactiveWebClient.WebClientBuilder() - .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType()) - .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName()) - .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build(); + return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 8c74bac3..45709aa2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -19,7 +19,6 @@ */ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Optional; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.prh.configuration.AppConfig; import org.onap.dcaegen2.services.prh.configuration.Config; @@ -79,8 +78,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override DMaaPConsumerReactiveHttpClient resolveClient() { - return Optional.ofNullable(dMaaPConsumerReactiveHttpClient) - .orElseGet(() -> new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient( - buildWebClient())); + return dMaaPConsumerReactiveHttpClient == null + ? new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) + : dMaaPConsumerReactiveHttpClient; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index af8d14a3..f559683d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -32,19 +32,15 @@ import reactor.core.publisher.Mono; */ abstract class DmaapPublisherTask { - abstract Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; abstract DMaaPProducerReactiveHttpClient resolveClient(); protected abstract DmaapPublisherConfiguration resolveConfiguration(); - protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; + protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException; WebClient buildWebClient() { - DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration(); - return new DMaaPReactiveWebClient.WebClientBuilder() - .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType()) - .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName()) - .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build(); + return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 11281d81..673e00f3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -48,14 +48,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { + Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) { logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(), consumerDmaapModel); - return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel).map(Integer::parseInt); + return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); } @Override - public Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { + public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException { consumerDmaapModel = Optional.ofNullable(consumerDmaapModel) .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task")); dMaaPProducerReactiveHttpClient = resolveClient(); @@ -70,8 +70,8 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { @Override DMaaPProducerReactiveHttpClient resolveClient() { - return Optional.ofNullable(dMaaPProducerReactiveHttpClient) - .orElseGet(() -> new DMaaPProducerReactiveHttpClient(resolveConfiguration()) - .createDMaaPWebClient(buildWebClient())); + return dMaaPProducerReactiveHttpClient == null + ? new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient()) + : dMaaPProducerReactiveHttpClient; } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 2787e64b..365552b7 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -54,7 +54,7 @@ public class ScheduledTasks { public void scheduleMainPrhEventTask() { logger.trace("Execution of tasks was registered"); - Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) + Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) .map(this::publishToAAIConfiguration) .flatMap(this::publishToDMaaPConfiguration) @@ -67,7 +67,7 @@ public class ScheduledTasks { logger.info("PRH tasks have been completed"); } - private void onSuccess(Integer responseCode) { + private void onSuccess(String responseCode) { logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); } @@ -90,17 +90,15 @@ public class ScheduledTasks { try { return Mono.just(aaiProducerTask.execute(dmaapModel)); } catch (PrhTaskException e) { - logger.warn("Exception in A&AIProducer task ", e); return Mono.error(e); } }); } - private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) { + private Mono<String> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) { try { return dmaapProducerTask.execute(monoAAIModel); } catch (PrhTaskException e) { - logger.warn("Exception in DMaaPProducer task ", e); return Mono.error(e); } } |