From e1e4154a3914f0878e26ea2e1683ebdfe507f5a9 Mon Sep 17 00:00:00 2001 From: "sangeeta.bellara" Date: Thu, 9 Mar 2023 22:13:03 +0530 Subject: PRH Code Additions for Early PNF registrations Issue-ID: DCAEGEN2-3312 Change-Id: Id9b1ca83390af3675e26fc61ccc8d12611ab8ddf Signed-off-by: sangeeta.bellara Signed-off-by: sangeeta.bellara Change-Id: I9bc25bc1343c40aca5644de3fd30f7c2142c1a47 Signed-off-by: sangeeta.bellara --- prh-app-server/pom.xml | 8 +- .../services/prh/configuration/KafkaConfig.java | 96 ++++++++ .../prh/controllers/ScheduleController.java | 20 +- .../prh/service/DmaapConsumerJsonParser.java | 173 +++++++++----- .../services/prh/tasks/AaiProducerTask.java | 3 +- .../dcaegen2/services/prh/tasks/AaiQueryTask.java | 4 +- .../services/prh/tasks/AaiQueryTaskImpl.java | 27 ++- .../services/prh/tasks/ScheduledTasks.java | 51 ++-- .../services/prh/tasks/ScheduledTasksRunner.java | 9 +- .../prh/tasks/commit/EpochDateTimeConversion.java | 95 ++++++++ .../prh/tasks/commit/KafkaConsumerTask.java | 35 +++ .../prh/tasks/commit/KafkaConsumerTaskImpl.java | 99 ++++++++ .../commit/ScheduledTasksRunnerWithCommit.java | 99 ++++++++ .../prh/tasks/commit/ScheduledTasksWithCommit.java | 213 +++++++++++++++++ prh-app-server/src/main/resources/application.yaml | 13 +- .../prh/configuration/KafkaConfigTest.java | 56 +++++ .../prh/controllers/ScheduleControllerTest.java | 5 +- .../integration/PrhWorkflowIntegrationTest.java | 21 +- .../prh/service/DmaapConsumerJsonParserTest.java | 28 ++- .../services/prh/tasks/AaiQueryTaskImplTest.java | 24 +- .../tasks/commit/EpochDateTimeConversionTest.java | 52 ++++ .../tasks/commit/KafkaConsumerTaskImplTest.java | 86 +++++++ .../commit/ScheduledTasksRunnerWithCommitTest.java | 72 ++++++ .../tasks/commit/ScheduledTasksWithCommitTest.java | 263 +++++++++++++++++++++ prh-app-server/src/test/resources/application.yaml | 4 +- 25 files changed, 1433 insertions(+), 123 deletions(-) create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java create mode 100644 prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java create mode 100644 prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java create mode 100644 prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java create mode 100644 prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java create mode 100644 prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java (limited to 'prh-app-server') diff --git a/prh-app-server/pom.xml b/prh-app-server/pom.xml index 62e19dce..4de28bc8 100644 --- a/prh-app-server/pom.xml +++ b/prh-app-server/pom.xml @@ -5,6 +5,7 @@ ~ ================================================================================ ~ Copyright (C) 2018-2022 NOKIA Intellectual Property. All rights reserved. ~ Copyright (C) 2021 Samsung Electronics. All rights reserved. + ~ Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -27,7 +28,7 @@ org.onap.dcaegen2.services prh - 1.8.1-SNAPSHOT + 1.9.0-SNAPSHOT org.onap.dcaegen2.services.prh @@ -317,5 +318,10 @@ jersey-apache-connector test + + org.springframework.kafka + spring-kafka + 2.8.11 + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java new file mode 100644 index 00000000..8affe281 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Pravin Kokane on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@EnableKafka +@Configuration +public class KafkaConfig +{ + String kafkaBoostrapServerConfig = System.getenv("kafkaBoostrapServerConfig"); + + String groupIdConfig = System.getenv("groupIdConfig"); + + + String kafkaSecurityProtocol = System.getenv("kafkaSecurityProtocol"); + + String kafkaSaslMechanism = System.getenv("kafkaSaslMechanism"); + + String kafkaUsername = System.getenv("kafkaUsername"); + + String kafkaPassword = System.getenv("kafkaPassword"); + + String kafkaJaasConfig = System.getenv("JAAS_CONFIG"); + + String kafkaLoginModuleClassConfig = System.getenv("Login_Module_Class"); + + @Bean + public ConsumerFactory consumerFactory() + { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBoostrapServerConfig); + config.put(ConsumerConfig.GROUP_ID_CONFIG,groupIdConfig); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + if(kafkaJaasConfig == null) { + kafkaJaasConfig = kafkaLoginModuleClassConfig + " required username=\"" + + kafkaUsername + "\" password=\"" + kafkaPassword + "\";"; + } + if(kafkaSecurityProtocol==null ) kafkaSecurityProtocol="SASL_PLAINTEXT"; + config.put("security.protocol", kafkaSecurityProtocol); + if(kafkaSaslMechanism==null ) kafkaSaslMechanism="SCRAM-SHA-512"; + config.put("sasl.mechanism", kafkaSaslMechanism); + + config.put("sasl.jaas.config", kafkaJaasConfig); + + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() + { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index a0aa17e3..0b1f0e1c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ package org.onap.dcaegen2.services.prh.controllers; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner; +import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,30 +42,34 @@ import reactor.core.publisher.Mono; @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class); - private final ScheduledTasksRunner scheduledTasksRunner; + private ScheduledTasksRunner scheduledTasksRunner; - @Autowired + + @Autowired(required = false) public ScheduleController(ScheduledTasksRunner scheduledTasksRunner) { this.scheduledTasksRunner = scheduledTasksRunner; } + + @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") public Mono> startTasks() { - LOGGER.trace("Receiving start scheduling worker request"); - return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse); + return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse); } + @RequestMapping(value = "stopPrh", method = RequestMethod.GET) @ApiOperation(value = "Receiving stop scheduling worker request") public Mono> stopTask() { LOGGER.trace("Receiving stop scheduling worker request"); return Mono.defer(() -> { - scheduledTasksRunner.cancelTasks(); - return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); - } + scheduledTasksRunner.cancelTasks(); + return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); + } ); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index f98e952f..25c380fb 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,43 +18,48 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.service; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.collection.List; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.configurationprocessor.json.JSONArray; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.boot.configurationprocessor.json.JSONObject; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; + + + /** * @author Przemysław Wąsala on 5/8/18 @@ -74,6 +80,8 @@ public class DmaapConsumerJsonParser { private String pnfSwVersionOptionalField; private JsonObject pnfAdditionalFields; + private String sourceName; + /** * Extract info from string and create @see {@link ConsumerDmaapModel}. * @@ -84,6 +92,11 @@ public class DmaapConsumerJsonParser { return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } + public JSONObject getJsonObjectKafka(String jsonStr) throws JSONException { + return new JSONObject(jsonStr); + } + + private Flux getConsumerDmaapModelFromJsonArray(List items) { LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); @@ -97,25 +110,59 @@ public class DmaapConsumerJsonParser { .orElseGet(JsonObject::new))))); } + /** + * Extract info from string and create @see {@link ConsumerDmaapModel}. + * + * @param monoMessage - results from Kafka + * @return reactive DMaaPModel + * + */ + /** + * @author Shilpa Urade on 13/3/23 + */ + + public Flux getConsumerDmaapModelFromKafkaConsumerRecord(java.util.List items) + { + LOGGER.info("DmaapConsumerJsonParser input for parsing: {} with commit", items); + if (items.size() == 0) { + LOGGER.info("Nothing to consume from Kafka"); + return Flux.empty(); + } + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonObjectFromString -> getJsonObjectFromString(jsonObjectFromString) + .orElseGet(JsonObject::new))))); + } + + Optional getJsonObjectFromString(String element) { + return Optional.ofNullable(JsonParser.parseString(element).getAsJsonObject()); + } + + public String getSourceName() { + return sourceName; + } + Optional getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } + + Optional getJsonObjectFromKafkaRecords(String element) { + return Optional.ofNullable(new JsonObject().getAsJsonObject(element)); } + private Flux create(Flux jsonObject) { - return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") - : transform(monoJsonP)) - .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); + return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") + : transform(monoJsonP)); } private Mono transform(JsonObject responseFromDmaap) { JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(COMMON_EVENT_HEADER); + .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(PNF_REGISTRATION_FIELDS); - + .getAsJsonObject(PNF_REGISTRATION_FIELDS); this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); @@ -126,21 +173,20 @@ public class DmaapConsumerJsonParser { this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE); this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION); this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); - return (StringUtils.isEmpty(pnfSourceName)) - ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage()) : - Mono.just(ImmutableConsumerDmaapModel.builder() - .correlationId(pnfSourceName) - .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address) - .serialNumber(pnfSerialNumberOptionalField) - .equipVendor(pnfEquipVendorOptionalField) - .equipModel(pnfEquipModelOptionalField) - .equipType(pnfEquipTypeOptionalField) - .nfRole(pnfNfRoleOptionalField) - .swVersion(pnfSwVersionOptionalField) - .additionalFields(pnfAdditionalFields).build()); + ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " + + printMessage()) : + Mono.just(ImmutableConsumerDmaapModel.builder() + .correlationId(pnfSourceName) + .ipv4(pnfOamIpv4Address) + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { @@ -148,30 +194,39 @@ public class DmaapConsumerJsonParser { } private boolean containsHeader(JsonObject jsonObject) { - return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + try { + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + }catch(Exception e){ + LOGGER.info("Fetching an error in containsHeader method {}",e.getMessage()); + } + return false; } private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," - + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT - + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, - this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, - this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, - this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields ); } private Mono logErrorAndReturnMonoEmpty(String messageForLogger) { - LOGGER.warn(messageForLogger); + LOGGER.info(messageForLogger); return Mono.empty(); } + + public JSONArray getJsonArray(String value) throws JSONException { + return new JSONArray(value); + } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 35eb948b..ce8059b2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +26,6 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import reactor.core.publisher.Mono; @FunctionalInterface -interface AaiProducerTask { +public interface AaiProducerTask { Mono execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java index 11ff369a..5f86010a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +24,8 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import reactor.core.publisher.Mono; -@FunctionalInterface + public interface AaiQueryTask { Mono execute(final ConsumerDmaapModel aaiModel); + Mono findPnfinAAI(final ConsumerDmaapModel model); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java index 3db4887a..4a7eef58 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java @@ -3,10 +3,10 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -35,6 +35,8 @@ import org.onap.dcaegen2.services.prh.model.RelationshipDict; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Component public class AaiQueryTaskImpl implements AaiQueryTask { @@ -44,6 +46,7 @@ public class AaiQueryTaskImpl implements AaiQueryTask { static final String SERVICE_TYPE = "service-subscription.service-type"; static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id"; + private static final Logger LOGGER = LoggerFactory.getLogger(AaiQueryTaskImpl.class); private final AaiHttpClient getPnfModelClient; private final AaiHttpClient getServiceClient; @@ -55,8 +58,11 @@ public class AaiQueryTaskImpl implements AaiQueryTask { this.getServiceClient = getServiceClient; } + + @Override public Mono execute(ConsumerDmaapModel aaiModel) { + return getPnfModelClient .getAaiResponse(aaiModel) .flatMap(this::checkIfPnfHasRelationToService) @@ -65,7 +71,22 @@ public class AaiQueryTaskImpl implements AaiQueryTask { .defaultIfEmpty(false); } + + // Added by DTAG, March 2023 + @Override + public Mono findPnfinAAI(final ConsumerDmaapModel model) { + + return getPnfModelClient + .getAaiResponse(model) + .flatMap(aaiModel -> Mono.just(model)); + + + } + + + private Mono checkIfPnfHasRelationToService(final AaiPnfResultModel model) { + return Mono .justOrEmpty(model.getRelationshipList()) .map(this::findRelatedTo) @@ -88,10 +109,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask { } private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) { + return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus()); } private Optional findRelatedTo(final Relationship data) { + return Optional.ofNullable(data.getRelationship()) .map(Stream::of) .orElseGet(Stream::empty) @@ -101,10 +124,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask { } private Optional findValue(final List data, final String key) { + return data .stream() .filter(y -> key.equals(y.getRelationshipKey())) .findFirst() .map(RelationshipData::getRelationshipValue); } + } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 68a44ebc..f305a925 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,45 +21,53 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; /** * @author Przemysław Wąsala on 3/23/18 */ +/** + * @author Sangeeta Bellara on 3/12/23 + */ + +@Profile("!autoCommitDisabled") @Component public class ScheduledTasks { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - - private final DmaapConsumerTask dmaapConsumerTask; - private final DmaapPublisherTask dmaapReadyProducerTask; - private final DmaapPublisherTask dmaapUpdateProducerTask; - private final AaiQueryTask aaiQueryTask; - private final AaiProducerTask aaiProducerTask; - private final BbsActionsTask bbsActionsTask; + private static Boolean pnfFound = true; + private DmaapConsumerTask dmaapConsumerTask; + + private DmaapPublisherTask dmaapReadyProducerTask; + private DmaapPublisherTask dmaapUpdateProducerTask; + private AaiQueryTask aaiQueryTask; + private AaiProducerTask aaiProducerTask; + private BbsActionsTask bbsActionsTask; private Map mdcContextMap; /** @@ -69,6 +78,7 @@ public class ScheduledTasks { * @param dmaapUpdatePublisherTask - fourth task * @param aaiPublisherTask - second task */ + @Autowired public ScheduledTasks( final DmaapConsumerTask dmaapConsumerTask, @@ -90,8 +100,8 @@ public class ScheduledTasks { static class State { public final ConsumerDmaapModel dmaapModel; public final Boolean activationStatus; - - public State(final ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { + + public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { this.dmaapModel = dmaapModel; this.activationStatus = activationStatus; } @@ -139,7 +149,7 @@ public class ScheduledTasks { private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { - LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable); } } @@ -153,7 +163,8 @@ public class ScheduledTasks { } private Mono queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { - return aaiQueryTask + LOGGER.info("Find AAI Info --> "+monoDMaaPModel.getCorrelationId()); + return aaiQueryTask .execute(monoDMaaPModel) .map(x -> new State(monoDMaaPModel, x)); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index 70c54a51..09e06da7 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +33,7 @@ import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; @@ -40,6 +42,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; /** * @author Przemysław Wąsala on 6/13/18 */ +@Profile("!autoCommitDisabled") @Configuration @EnableScheduling public class ScheduledTasksRunner { @@ -58,9 +61,11 @@ public class ScheduledTasksRunner { this.prhProperties = prhProperties; } + String profile = System.getenv("SPRING_PROFILES_ACTIVE"); + @EventListener public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { - tryToStartTask(); + tryToStartTask(); } /** @@ -88,5 +93,5 @@ public class ScheduledTasksRunner { return false; } } - } + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java new file mode 100644 index 00000000..4bf49208 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; + +/** + * This class will return start date time of the day and end date time of the day in epoch format. + * @author Mohd Usman Khan on 3/13/23 + */ + +@Component +public class EpochDateTimeConversion { + + private static final Logger LOGGER = LoggerFactory.getLogger(EpochDateTimeConversion.class); + + private String daysForRecords = System.getenv("number_of_days"); + + public Long getStartDateOfTheDay(){ + return getEpochDateTime(atStartOfDay(getCurrentDate())); + } + + public Long getEndDateOfTheDay(){ + return getEpochDateTime(atEndOfDay(getCurrentDate())); + } + + private Long getEpochDateTime(Date date) + { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern("E MMM dd HH:mm:ss zzz yyyy"); + ZonedDateTime zdt = ZonedDateTime.parse( date.toString(),dtf); + return zdt.toInstant().toEpochMilli(); + } + + private Date getCurrentDate() + { + return new java.util.Date(System.currentTimeMillis()); + } + + public Date atStartOfDay(Date date) { + LocalDateTime localDateTime = dateToLocalDateTime(date); + if(daysForRecords==null) + daysForRecords="1"; + LocalDateTime previousDay = localDateTime.minusDays(Integer.parseInt(daysForRecords) - 1l); + LocalDateTime previousStartTime = previousDay.with(LocalTime.MIN); + return localDateTimeToDate(previousStartTime); + } + + private Date atEndOfDay(Date date) { + LocalDateTime localDateTime = dateToLocalDateTime(date); + LocalDateTime endOfDay = localDateTime.with(LocalTime.MAX); + return localDateTimeToDate(endOfDay); + } + + private LocalDateTime dateToLocalDateTime(Date date) { + return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); + } + + private Date localDateTimeToDate(LocalDateTime localDateTime) { + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } + + public String getDaysForRecords() { + return daysForRecords; + } + + public void setDaysForRecords(String daysForRecords) { + this.daysForRecords = daysForRecords; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java new file mode 100644 index 00000000..4c70c713 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.springframework.boot.configurationprocessor.json.JSONException; +import reactor.core.publisher.Flux; + +/** + * @author Ajinkya Patil on 3/13/23 + */ + +public interface KafkaConsumerTask { + Flux execute() throws JSONException; + + void commitOffset(); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java new file mode 100644 index 00000000..30e6cff1 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Ajinkya Patil on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@Component +public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class); + + @Autowired + private DmaapConsumerJsonParser dmaapConsumerJsonParser; + + @Autowired + private EpochDateTimeConversion epochDateTimeConversion; + + private List jsonEvent = new ArrayList<>(); + + private Acknowledgment offset; + + String kafkaTopic = System.getenv("kafkaTopic"); + + String groupIdConfig = System.getenv("groupIdConfig"); + + @Override + @KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}") + public void onMessage(List> list, Acknowledgment acknowledgment) { + + if (list != null && !list.isEmpty()) { + + + list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) + .map(ConsumerRecord::value) + .forEach(value -> { + jsonEvent.add(value); + }); + + + } + + + offset = acknowledgment; + } + + @Override + public Flux execute() throws JSONException { + return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent); + } + + @Override + public void commitOffset() { + if(!jsonEvent.isEmpty()){ + jsonEvent.clear(); + } + if(offset != null){ + offset.acknowledge(); + } + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java new file mode 100644 index 00000000..64d7798e --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import javax.annotation.PreDestroy; +import org.onap.dcaegen2.services.prh.configuration.PrhProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * @author Pravin Kokane on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@Configuration +@EnableScheduling +public class ScheduledTasksRunnerWithCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunnerWithCommit.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); + private static List scheduledPrhTaskFutureList = new ArrayList<>(); + + private final TaskScheduler taskScheduler; + private final PrhProperties prhProperties; + + @Autowired + private ScheduledTasksWithCommit scheduledTasksWithCommit; + + public ScheduledTasksRunnerWithCommit(TaskScheduler taskScheduler, ScheduledTasksWithCommit scheduledTasksWithCommit, + PrhProperties prhProperties) { + this.taskScheduler = taskScheduler; + this.scheduledTasksWithCommit = scheduledTasksWithCommit; + this.prhProperties = prhProperties; + } + + @EventListener + public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { + tryToStartTaskWithCommit(); + } + + /** + * Function which have to stop tasks execution. + */ + @PreDestroy + public synchronized void cancelTasks() { + scheduledPrhTaskFutureList.forEach(x -> x.cancel(false)); + scheduledPrhTaskFutureList.clear(); + } + + /** + * Function for starting scheduling PRH workflow. + * + * @return status of operation execution: true - started, false - not started + */ + + public synchronized boolean tryToStartTaskWithCommit() { + LOGGER.info(ENTRY, "Start scheduling PRH workflow with Commit Tasks Runner"); + if (scheduledPrhTaskFutureList.isEmpty()) { + Collections.synchronizedList(scheduledPrhTaskFutureList); + scheduledPrhTaskFutureList.add(taskScheduler + .scheduleWithFixedDelay(scheduledTasksWithCommit::scheduleKafkaPrhEventTask, + prhProperties.getWorkflowSchedulingInterval())); + return true; + } else { + return false; + } + } + +} + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java new file mode 100644 index 00000000..b0eae949 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java @@ -0,0 +1,213 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; +import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask; +import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask; +import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask; +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Sangeeta Bellara on 3/13/23 + */ +@Profile("autoCommitDisabled") +@Component +public class ScheduledTasksWithCommit { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksWithCommit.class); + private static Boolean pnfFound = true; + private KafkaConsumerTask kafkaConsumerTask; + private DmaapPublisherTask dmaapReadyProducerTask; + private DmaapPublisherTask dmaapUpdateProducerTask; + private AaiQueryTask aaiQueryTask; + private AaiProducerTask aaiProducerTask; + private BbsActionsTask bbsActionsTask; + private Map mdcContextMap; + + /** + * Constructor for tasks registration in PRHWorkflow. + * + * @param kafkaConsumerTask - fist task + * @param dmaapReadyPublisherTask - third task + * @param dmaapUpdatePublisherTask - fourth task + * @param aaiPublisherTask - second task + */ + @Autowired + public ScheduledTasksWithCommit( + final KafkaConsumerTask kafkaConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, + final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, + final Map mdcContextMap) { + this.dmaapReadyProducerTask = dmaapReadyPublisherTask; + this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; + this.kafkaConsumerTask=kafkaConsumerTask; + this.aaiQueryTask = aaiQueryTask; + this.aaiProducerTask = aaiPublisherTask; + this.bbsActionsTask = bbsActionsTask; + this.mdcContextMap = mdcContextMap; + } + + static class State { + public ConsumerDmaapModel dmaapModel; + public Boolean activationStatus; + + public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { + this.dmaapModel = dmaapModel; + this.activationStatus = activationStatus; + } + } + + public void scheduleKafkaPrhEventTask() { + MdcVariables.setMdcContextMap(mdcContextMap); + try { + LOGGER.info("Execution of tasks was registered with commit"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromKafkaMessage() + .flatMap(model->queryAaiForPnf(model) + .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e); + disableCommit(); + }) + .onErrorResume(e -> Mono.empty()) + + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::processAdditionalFields) + .flatMap(this::publishToDmaapConfiguration) + .onErrorResume(e -> Mono.empty()) + + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); + mainCountDownLatch.await(); + } catch (InterruptedException | JSONException e ) { + LOGGER.warn("Interruption problem on countDownLatch {}", e); + Thread.currentThread().interrupt(); + } + } + + private static void disableCommit() + { + pnfFound=false; + } + + private void onCompleteKafka() { + LOGGER.info("PRH tasks have been completed"); + if(pnfFound){ + kafkaConsumerTask.commitOffset(); + LOGGER.info("Committed the Offset"); + } + else + { + LOGGER.info("Offset not Committed"); + pnfFound=true; + } + } + + + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + String statusCodeOk = HttpStatus.OK.name(); + MDC.put(RESPONSE_CODE, statusCodeOk); + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk); + MDC.remove(RESPONSE_CODE); + } + } + + private void onError(Throwable throwable) { + if (!(throwable instanceof DmaapEmptyResponseException)) { + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable); + } + } + + private Flux consumeFromKafkaMessage() throws JSONException { + return kafkaConsumerTask.execute(); + } + + private Mono queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { + return aaiQueryTask + .execute(monoDMaaPModel) + .map(x -> new State(monoDMaaPModel, x)); + } + + private Mono queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) { + + LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId()); + return aaiQueryTask.findPnfinAAI(monoDMaaPModel); + } + + + private Mono publishToAaiConfiguration(final State state) { + try { + return aaiProducerTask + .execute(state.dmaapModel) + .map(x -> state); + } catch (PrhTaskException e) { + LOGGER.warn("AAIProducerTask exception has been registered: {}", e); + return Mono.error(e); + } + } + + private Mono processAdditionalFields(final State state) { + if (state.activationStatus) { + LOGGER.debug("Re-registration - Logical links won't be updated."); + return Mono.just(state); + } + return bbsActionsTask.execute(state.dmaapModel).map(x -> state); + } + + private Flux publishToDmaapConfiguration(final State state) { + try { + if (state.activationStatus) { + LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.execute(state.dmaapModel); + } + return dmaapReadyProducerTask.execute(state.dmaapModel); + } catch (PrhTaskException e) { + LOGGER.warn("DMaaPProducerTask exception has been registered: ", e); + return Flux.error(e); + } + } +} diff --git a/prh-app-server/src/main/resources/application.yaml b/prh-app-server/src/main/resources/application.yaml index 8f1950d0..e62d4e90 100644 --- a/prh-app-server/src/main/resources/application.yaml +++ b/prh-app-server/src/main/resources/application.yaml @@ -1,6 +1,6 @@ spring: - profiles: - active: prod + profiles: prod + server: port: 8433 ssl: @@ -25,4 +25,13 @@ logging: management.endpoints.web.exposure.include: "*" +--- +spring: + profiles: + default: prod + + + + + diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java new file mode 100644 index 00000000..22b82e3d --- /dev/null +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class KafkaConfigTest { + + @InjectMocks + KafkaConfig kafkaConfig; + + @BeforeEach + void setUp() { + kafkaConfig.kafkaBoostrapServerConfig = "0.0.0.0"; + kafkaConfig.groupIdConfig = "consumer-test"; + kafkaConfig.kafkaSecurityProtocol = "test"; + kafkaConfig.kafkaSaslMechanism = "test"; + kafkaConfig.kafkaUsername = "test"; + kafkaConfig.kafkaPassword = "test"; + kafkaConfig.kafkaJaasConfig = null; + kafkaConfig.kafkaLoginModuleClassConfig = "test"; + kafkaConfig.kafkaJaasConfig = "test"; + } + + @Test + public void consumerFactoryTest(){ + kafkaConfig.consumerFactory(); + } + + @Test + public void kafkaListenerContainerFactoryTest(){ + kafkaConfig.kafkaListenerContainerFactory(); + } +} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java index ebdec09e..bbc6b968 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +43,7 @@ class ScheduleControllerTest { @Autowired private WebTestClient webTestClient; - @Test + @Test void startEndpointShouldAllowStartingPrhTasks() { when(scheduledTasksRunner.tryToStartTask()).thenReturn(true); webTestClient @@ -72,4 +73,4 @@ class ScheduleControllerTest { verify(scheduledTasksRunner).cancelTasks(); } -} \ No newline at end of file +} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java index 01beb88b..1a6c76c4 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2019-2021 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,11 +39,29 @@ import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static com.github.tomakehurst.wiremock.client.WireMock.patch; + + + + + import java.nio.file.Files; import java.nio.file.Paths; -import static com.github.tomakehurst.wiremock.client.WireMock.*; + import static java.lang.ClassLoader.getSystemResource; import static java.util.Collections.singletonList; diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java index 9dab7aaa..ba759354 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +21,12 @@ package org.onap.dcaegen2.services.prh.service; -import static org.mockito.Mockito.spy; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.collection.List; -import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; @@ -37,6 +34,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.Optional; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + /** * @author Przemysław Wąsala on 5/8/18 */ @@ -108,7 +110,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser @@ -171,7 +173,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = new JsonParser().parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser .getJsonObject(Mono.just((response))).blockFirst(); @@ -238,7 +240,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser @@ -302,7 +304,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser @@ -334,7 +336,7 @@ class DmaapConsumerJsonParserTest { DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); String incorrectMessage = "{\"event\": {" + "\"commonEventHeader\": {}," @@ -380,7 +382,7 @@ class DmaapConsumerJsonParserTest { DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); String jsonWithoutSourceName = "{\"event\": {" @@ -430,7 +432,7 @@ class DmaapConsumerJsonParserTest { DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); String jsonWithoutIpInformation = "{\"event\": {" @@ -497,7 +499,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); dmaapConsumerJsonParser.getJsonObject(Mono.just((response))); @@ -573,7 +575,7 @@ class DmaapConsumerJsonParserTest { //when DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser()); JsonElement jsonElement = jsonParser.parse(parsed); - Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())) + doReturn(Optional.of(jsonElement.getAsJsonObject())) .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement); ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response)) .blockFirst(); diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java index e81b3746..517fe73a 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java @@ -3,6 +3,7 @@ * PROJECT * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +27,8 @@ import static org.mockito.Mockito.mock; import java.util.Collections; import java.util.List; + + import org.assertj.core.util.Lists; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +41,7 @@ import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiPnfResultModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceQueryModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceResultModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableRelationshipData; import org.onap.dcaegen2.services.prh.model.Relationship; import org.onap.dcaegen2.services.prh.model.RelationshipData; @@ -84,18 +88,6 @@ class AaiQueryTaskImplTest { sut = new AaiQueryTaskImpl(getPnfModelClient, getServiceClient); } - @Test - void whenPnfIsUnavailable_ShouldThrowException() { - //given - given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.error(new Exception("404"))); - - //when - final Mono task = sut.execute(aaiModel); - - //then - Assertions.assertThrows(Exception.class, task::block); - } - @Test void whenPnfIsAvailableButRelationshipIsNull_ShouldReturnFalse() { //given @@ -203,4 +195,12 @@ class AaiQueryTaskImplTest { private void configurePnfClient(final ConsumerDmaapModel aaiModel, final AaiPnfResultModel pnfResultModel) { given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.just(pnfResultModel)); } + + @Test + void testFindPnfInAAIActive(){ + ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder().correlationId("123").build(); + configurePnfClient(model, pnfResultModel); + Mono test = sut.findPnfinAAI(model); + Assertions.assertNotNull(test); + } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java new file mode 100644 index 00000000..850587e0 --- /dev/null +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class EpochDateTimeConversionTest { + + private EpochDateTimeConversion epochDateTimeConversion; + + @BeforeEach + void setUp() { + epochDateTimeConversion = new EpochDateTimeConversion(); + epochDateTimeConversion.setDaysForRecords("3"); + } + + @Test + public void getStartDateOfTheDayTest(){ + epochDateTimeConversion.getDaysForRecords(); + Long day = epochDateTimeConversion.getStartDateOfTheDay(); + Assertions.assertNotNull(day); + } + + @Test + public void getEndDateOfTheDayTest(){ + Long day = epochDateTimeConversion.getEndDateOfTheDay(); + Assertions.assertNotNull(day); + } +} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java new file mode 100644 index 00000000..c23a1886 --- /dev/null +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; +import org.onap.dcaegen2.services.prh.tasks.commit.EpochDateTimeConversion; +import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.kafka.support.Acknowledgment; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class KafkaConsumerTaskImplTest { + + @Mock + private Acknowledgment acknowledgment; + + @Mock + private DmaapConsumerJsonParser dmaapConsumerJsonParser; + + @Mock + private EpochDateTimeConversion epochDateTimeConversion; + + @InjectMocks + private KafkaConsumerTaskImpl kafkaConsumerTask; + + @Test + public void onMessageTest(){ + List> list = new ArrayList<>(); + TimestampType timestampType = null; + Headers headers = new RecordHeaders(); + epochDateTimeConversion.setDaysForRecords("3"); + ConsumerRecord records = new ConsumerRecord<> + ("test-topic", 1, 1l, 0l, timestampType, 1, 1, "test-key", "test-value", headers + , null); + list.add(records); + kafkaConsumerTask.onMessage(list, acknowledgment); + } + + @Test + public void commitOffsetTest(){ + kafkaConsumerTask.commitOffset(); + } + + @Test + public void executeTest() throws JSONException { + List jsonEvent = new ArrayList<>(); + ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().correlationId("123").build(); + when(dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent)).thenReturn(Flux.just(consumerDmaapModel)); + kafkaConsumerTask.execute(); + } +} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java new file mode 100644 index 00000000..401e351f --- /dev/null +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java @@ -0,0 +1,72 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.services.prh.configuration.PrhProperties; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.scheduling.TaskScheduler; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@ExtendWith(MockitoExtension.class) +public class ScheduledTasksRunnerWithCommitTest { + + @Mock + private ScheduledTasksWithCommit scheduledTasksWithCommit; + + @Mock + private TaskScheduler taskScheduler; + + @Mock + private PrhProperties prhProperties; + + @Mock + private ApplicationStartedEvent applicationStartedEvent; + + private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; + + @BeforeEach + void setUp() { + scheduledTasksRunnerWithCommit = new ScheduledTasksRunnerWithCommit(taskScheduler, scheduledTasksWithCommit, prhProperties); + } + + @Test + void onApplicationStartedEvent() { + scheduledTasksRunnerWithCommit.onApplicationStartedEvent(applicationStartedEvent); + assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()); + } + + @Test + void cancelTasks() { + scheduledTasksRunnerWithCommit.cancelTasks(); + } + + @Test + void tryToStartTaskWithCommit() { + scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit(); + assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()); + } +} diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java new file mode 100644 index 00000000..64779027 --- /dev/null +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java @@ -0,0 +1,263 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.Map; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; +import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask; +import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask; +import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask; +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.springframework.boot.configurationprocessor.json.JSONException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + + +@ExtendWith(MockitoExtension.class) +class ScheduledTasksWithCommitTest { + private final static ConsumerDmaapModel DMAAP_MODEL = + ImmutableConsumerDmaapModel + .builder() + .correlationId("SomeId") + .ipv4("ipv4") + .ipv6("ipv6") + .build(); + + @Mock + private DmaapPublisherTask readyPublisher; + + @Mock + private DmaapPublisherTask updatePublisher; + + + @Mock + private BbsActionsTask bbsActionsTask; + + @Mock + private KafkaConsumerTask kafkaConsumerTask; + + @Mock + private AaiQueryTask aaiQueryTask; + + @Mock + private AaiProducerTask aaiProducerTask; + + private final Map context = Collections.emptyMap(); + + private ScheduledTasksWithCommit sut; + + @BeforeEach + void setUp() { + sut = new ScheduledTasksWithCommit( + kafkaConsumerTask, + readyPublisher, + updatePublisher, + aaiQueryTask, + aaiProducerTask, + bbsActionsTask, + context); + } + + @Test + void testQueryAAiForPNFOnSuccess() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false ); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true)); + when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL)); + when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse)); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void testQueryAAiForPNF() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true)); + when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL)); + when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse)); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void testQueryAAiForPNFOnError() throws JSONException, PrhTaskException { + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + + sut.scheduleKafkaPrhEventTask(); + + verifyThatPnfUpdateWasNotSentToAai(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic(); + } + + @Test + void testQueryAAiForPNFOnPRHException() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false ); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true)); + when(aaiProducerTask.execute(state.dmaapModel)).thenThrow(new PrhTaskException()); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void queryAAiForPNFOnPRHExceptionTest() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true)); + when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL)); + when(updatePublisher.execute(state.dmaapModel)).thenThrow(new PrhTaskException()); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void queryAAiForPNFOnPRHExceptionOnDmaapEmptyResponseExceptionTest() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true)); + when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL)); + when(updatePublisher.execute(state.dmaapModel)).thenThrow(new DmaapEmptyResponseException()); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void queryAAiForPNFOnPRHExceptionOnFalseTest() throws JSONException, PrhTaskException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL)); + when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL)); + when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(false)); + when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL)); + + sut.scheduleKafkaPrhEventTask(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + @Test + void queryAAiForPNFOnPRHExceptionOnJSONExceptionTest() throws PrhTaskException, JSONException { + ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true); + MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() { + @Override + public @Nullable String failReason() { + return null; + } + }; + when(kafkaConsumerTask.execute()).thenThrow(new JSONException("json format exception")); + + sut.scheduleKafkaPrhEventTask(); + + verifyIfLogicalLinkWasNotCreated(); + verifyThatPnfModelWasNotSentDmaapPnfReadyTopic(); + } + + private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() throws PrhTaskException { + verify(readyPublisher, never()).execute(DMAAP_MODEL); + } + + private void verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() throws PrhTaskException { + verify(updatePublisher, never()).execute(DMAAP_MODEL); + } + + private void verifyThatPnfUpdateWasNotSentToAai() throws PrhTaskException { + verify(aaiProducerTask, never()).execute(DMAAP_MODEL); + } + + private void verifyIfLogicalLinkWasNotCreated(){ + verify(bbsActionsTask, never()).execute(DMAAP_MODEL); + } +} + diff --git a/prh-app-server/src/test/resources/application.yaml b/prh-app-server/src/test/resources/application.yaml index fa7f11cf..85ab663c 100644 --- a/prh-app-server/src/test/resources/application.yaml +++ b/prh-app-server/src/test/resources/application.yaml @@ -2,7 +2,9 @@ spring: profiles: active: prod + + logging: level: org.onap.dcaegen2.services.prh: debug - org.onap.dcaegen2.services.sdk: debug \ No newline at end of file + org.onap.dcaegen2.services.sdk: debug -- cgit 1.2.3-korg