From e1e4154a3914f0878e26ea2e1683ebdfe507f5a9 Mon Sep 17 00:00:00 2001 From: "sangeeta.bellara" Date: Thu, 9 Mar 2023 22:13:03 +0530 Subject: PRH Code Additions for Early PNF registrations Issue-ID: DCAEGEN2-3312 Change-Id: Id9b1ca83390af3675e26fc61ccc8d12611ab8ddf Signed-off-by: sangeeta.bellara Signed-off-by: sangeeta.bellara Change-Id: I9bc25bc1343c40aca5644de3fd30f7c2142c1a47 Signed-off-by: sangeeta.bellara --- .../services/prh/configuration/KafkaConfig.java | 96 ++++++++++ .../prh/controllers/ScheduleController.java | 20 +- .../prh/service/DmaapConsumerJsonParser.java | 173 +++++++++++------ .../services/prh/tasks/AaiProducerTask.java | 3 +- .../dcaegen2/services/prh/tasks/AaiQueryTask.java | 4 +- .../services/prh/tasks/AaiQueryTaskImpl.java | 27 ++- .../services/prh/tasks/ScheduledTasks.java | 51 +++-- .../services/prh/tasks/ScheduledTasksRunner.java | 9 +- .../prh/tasks/commit/EpochDateTimeConversion.java | 95 +++++++++ .../prh/tasks/commit/KafkaConsumerTask.java | 35 ++++ .../prh/tasks/commit/KafkaConsumerTaskImpl.java | 99 ++++++++++ .../commit/ScheduledTasksRunnerWithCommit.java | 99 ++++++++++ .../prh/tasks/commit/ScheduledTasksWithCommit.java | 213 +++++++++++++++++++++ 13 files changed, 833 insertions(+), 91 deletions(-) create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java create mode 100644 prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java (limited to 'prh-app-server/src/main/java/org/onap') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java new file mode 100644 index 00000000..8affe281 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Pravin Kokane on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@EnableKafka +@Configuration +public class KafkaConfig +{ + String kafkaBoostrapServerConfig = System.getenv("kafkaBoostrapServerConfig"); + + String groupIdConfig = System.getenv("groupIdConfig"); + + + String kafkaSecurityProtocol = System.getenv("kafkaSecurityProtocol"); + + String kafkaSaslMechanism = System.getenv("kafkaSaslMechanism"); + + String kafkaUsername = System.getenv("kafkaUsername"); + + String kafkaPassword = System.getenv("kafkaPassword"); + + String kafkaJaasConfig = System.getenv("JAAS_CONFIG"); + + String kafkaLoginModuleClassConfig = System.getenv("Login_Module_Class"); + + @Bean + public ConsumerFactory consumerFactory() + { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBoostrapServerConfig); + config.put(ConsumerConfig.GROUP_ID_CONFIG,groupIdConfig); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + if(kafkaJaasConfig == null) { + kafkaJaasConfig = kafkaLoginModuleClassConfig + " required username=\"" + + kafkaUsername + "\" password=\"" + kafkaPassword + "\";"; + } + if(kafkaSecurityProtocol==null ) kafkaSecurityProtocol="SASL_PLAINTEXT"; + config.put("security.protocol", kafkaSecurityProtocol); + if(kafkaSaslMechanism==null ) kafkaSaslMechanism="SCRAM-SHA-512"; + config.put("sasl.mechanism", kafkaSaslMechanism); + + config.put("sasl.jaas.config", kafkaJaasConfig); + + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() + { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index a0aa17e3..0b1f0e1c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ package org.onap.dcaegen2.services.prh.controllers; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner; +import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,30 +42,34 @@ import reactor.core.publisher.Mono; @Api(value = "ScheduleController", description = "Schedule Controller") public class ScheduleController { + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class); - private final ScheduledTasksRunner scheduledTasksRunner; + private ScheduledTasksRunner scheduledTasksRunner; - @Autowired + + @Autowired(required = false) public ScheduleController(ScheduledTasksRunner scheduledTasksRunner) { this.scheduledTasksRunner = scheduledTasksRunner; } + + @RequestMapping(value = "start", method = RequestMethod.GET) @ApiOperation(value = "Start scheduling worker request") public Mono> startTasks() { - LOGGER.trace("Receiving start scheduling worker request"); - return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse); + return Mono.fromSupplier(scheduledTasksRunner::tryToStartTask).map(this::createStartTaskResponse); } + @RequestMapping(value = "stopPrh", method = RequestMethod.GET) @ApiOperation(value = "Receiving stop scheduling worker request") public Mono> stopTask() { LOGGER.trace("Receiving stop scheduling worker request"); return Mono.defer(() -> { - scheduledTasksRunner.cancelTasks(); - return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); - } + scheduledTasksRunner.cancelTasks(); + return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); + } ); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index f98e952f..25c380fb 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,43 +18,48 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.service; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; -import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.collection.List; -import java.util.Optional; -import java.util.stream.StreamSupport; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.configurationprocessor.json.JSONArray; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.boot.configurationprocessor.json.JSONObject; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.ArrayList; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; + + + /** * @author Przemysław Wąsala on 5/8/18 @@ -74,6 +80,8 @@ public class DmaapConsumerJsonParser { private String pnfSwVersionOptionalField; private JsonObject pnfAdditionalFields; + private String sourceName; + /** * Extract info from string and create @see {@link ConsumerDmaapModel}. * @@ -84,6 +92,11 @@ public class DmaapConsumerJsonParser { return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } + public JSONObject getJsonObjectKafka(String jsonStr) throws JSONException { + return new JSONObject(jsonStr); + } + + private Flux getConsumerDmaapModelFromJsonArray(List items) { LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); @@ -97,25 +110,59 @@ public class DmaapConsumerJsonParser { .orElseGet(JsonObject::new))))); } + /** + * Extract info from string and create @see {@link ConsumerDmaapModel}. + * + * @param monoMessage - results from Kafka + * @return reactive DMaaPModel + * + */ + /** + * @author Shilpa Urade on 13/3/23 + */ + + public Flux getConsumerDmaapModelFromKafkaConsumerRecord(java.util.List items) + { + LOGGER.info("DmaapConsumerJsonParser input for parsing: {} with commit", items); + if (items.size() == 0) { + LOGGER.info("Nothing to consume from Kafka"); + return Flux.empty(); + } + return create( + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonObjectFromString -> getJsonObjectFromString(jsonObjectFromString) + .orElseGet(JsonObject::new))))); + } + + Optional getJsonObjectFromString(String element) { + return Optional.ofNullable(JsonParser.parseString(element).getAsJsonObject()); + } + + public String getSourceName() { + return sourceName; + } + Optional getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } + + Optional getJsonObjectFromKafkaRecords(String element) { + return Optional.ofNullable(new JsonObject().getAsJsonObject(element)); } + private Flux create(Flux jsonObject) { - return jsonObject.flatMap(monoJsonP -> - !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") - : transform(monoJsonP)) - .onErrorResume(exception -> exception instanceof DmaapNotFoundException, e -> Mono.empty()); + return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) ? logErrorAndReturnMonoEmpty("Incorrect JsonObject - missing header") + : transform(monoJsonP)); } private Mono transform(JsonObject responseFromDmaap) { JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(COMMON_EVENT_HEADER); + .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) - .getAsJsonObject(PNF_REGISTRATION_FIELDS); - + .getAsJsonObject(PNF_REGISTRATION_FIELDS); this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); @@ -126,21 +173,20 @@ public class DmaapConsumerJsonParser { this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE); this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION); this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); - return (StringUtils.isEmpty(pnfSourceName)) - ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage()) : - Mono.just(ImmutableConsumerDmaapModel.builder() - .correlationId(pnfSourceName) - .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address) - .serialNumber(pnfSerialNumberOptionalField) - .equipVendor(pnfEquipVendorOptionalField) - .equipModel(pnfEquipModelOptionalField) - .equipType(pnfEquipTypeOptionalField) - .nfRole(pnfNfRoleOptionalField) - .swVersion(pnfSwVersionOptionalField) - .additionalFields(pnfAdditionalFields).build()); + ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " + + printMessage()) : + Mono.just(ImmutableConsumerDmaapModel.builder() + .correlationId(pnfSourceName) + .ipv4(pnfOamIpv4Address) + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { @@ -148,30 +194,39 @@ public class DmaapConsumerJsonParser { } private boolean containsHeader(JsonObject jsonObject) { - return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + try { + return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); + }catch(Exception e){ + LOGGER.info("Fetching an error in containsHeader method {}",e.getMessage()); + } + return false; } private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," - + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," - + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," - + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT - + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, - this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, - this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, - this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields ); } private Mono logErrorAndReturnMonoEmpty(String messageForLogger) { - LOGGER.warn(messageForLogger); + LOGGER.info(messageForLogger); return Mono.empty(); } + + public JSONArray getJsonArray(String value) throws JSONException { + return new JSONArray(value); + } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 35eb948b..ce8059b2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +26,6 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import reactor.core.publisher.Mono; @FunctionalInterface -interface AaiProducerTask { +public interface AaiProducerTask { Mono execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java index 11ff369a..5f86010a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +24,8 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import reactor.core.publisher.Mono; -@FunctionalInterface + public interface AaiQueryTask { Mono execute(final ConsumerDmaapModel aaiModel); + Mono findPnfinAAI(final ConsumerDmaapModel model); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java index 3db4887a..4a7eef58 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java @@ -3,10 +3,10 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -35,6 +35,8 @@ import org.onap.dcaegen2.services.prh.model.RelationshipDict; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Component public class AaiQueryTaskImpl implements AaiQueryTask { @@ -44,6 +46,7 @@ public class AaiQueryTaskImpl implements AaiQueryTask { static final String SERVICE_TYPE = "service-subscription.service-type"; static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id"; + private static final Logger LOGGER = LoggerFactory.getLogger(AaiQueryTaskImpl.class); private final AaiHttpClient getPnfModelClient; private final AaiHttpClient getServiceClient; @@ -55,8 +58,11 @@ public class AaiQueryTaskImpl implements AaiQueryTask { this.getServiceClient = getServiceClient; } + + @Override public Mono execute(ConsumerDmaapModel aaiModel) { + return getPnfModelClient .getAaiResponse(aaiModel) .flatMap(this::checkIfPnfHasRelationToService) @@ -65,7 +71,22 @@ public class AaiQueryTaskImpl implements AaiQueryTask { .defaultIfEmpty(false); } + + // Added by DTAG, March 2023 + @Override + public Mono findPnfinAAI(final ConsumerDmaapModel model) { + + return getPnfModelClient + .getAaiResponse(model) + .flatMap(aaiModel -> Mono.just(model)); + + + } + + + private Mono checkIfPnfHasRelationToService(final AaiPnfResultModel model) { + return Mono .justOrEmpty(model.getRelationshipList()) .map(this::findRelatedTo) @@ -88,10 +109,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask { } private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) { + return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus()); } private Optional findRelatedTo(final Relationship data) { + return Optional.ofNullable(data.getRelationship()) .map(Stream::of) .orElseGet(Stream::empty) @@ -101,10 +124,12 @@ public class AaiQueryTaskImpl implements AaiQueryTask { } private Optional findValue(final List data, final String key) { + return data .stream() .filter(y -> key.equals(y.getRelationshipKey())) .findFirst() .map(RelationshipData::getRelationshipValue); } + } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 68a44ebc..f305a925 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,45 +21,53 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; /** * @author Przemysław Wąsala on 3/23/18 */ +/** + * @author Sangeeta Bellara on 3/12/23 + */ + +@Profile("!autoCommitDisabled") @Component public class ScheduledTasks { private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); - - private final DmaapConsumerTask dmaapConsumerTask; - private final DmaapPublisherTask dmaapReadyProducerTask; - private final DmaapPublisherTask dmaapUpdateProducerTask; - private final AaiQueryTask aaiQueryTask; - private final AaiProducerTask aaiProducerTask; - private final BbsActionsTask bbsActionsTask; + private static Boolean pnfFound = true; + private DmaapConsumerTask dmaapConsumerTask; + + private DmaapPublisherTask dmaapReadyProducerTask; + private DmaapPublisherTask dmaapUpdateProducerTask; + private AaiQueryTask aaiQueryTask; + private AaiProducerTask aaiProducerTask; + private BbsActionsTask bbsActionsTask; private Map mdcContextMap; /** @@ -69,6 +78,7 @@ public class ScheduledTasks { * @param dmaapUpdatePublisherTask - fourth task * @param aaiPublisherTask - second task */ + @Autowired public ScheduledTasks( final DmaapConsumerTask dmaapConsumerTask, @@ -90,8 +100,8 @@ public class ScheduledTasks { static class State { public final ConsumerDmaapModel dmaapModel; public final Boolean activationStatus; - - public State(final ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { + + public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { this.dmaapModel = dmaapModel; this.activationStatus = activationStatus; } @@ -139,7 +149,7 @@ public class ScheduledTasks { private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { - LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable); } } @@ -153,7 +163,8 @@ public class ScheduledTasks { } private Mono queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { - return aaiQueryTask + LOGGER.info("Find AAI Info --> "+monoDMaaPModel.getCorrelationId()); + return aaiQueryTask .execute(monoDMaaPModel) .map(x -> new State(monoDMaaPModel, x)); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index 70c54a51..09e06da7 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -3,6 +3,7 @@ * PNF-REGISTRATION-HANDLER * ================================================================================ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +33,7 @@ import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; @@ -40,6 +42,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; /** * @author Przemysław Wąsala on 6/13/18 */ +@Profile("!autoCommitDisabled") @Configuration @EnableScheduling public class ScheduledTasksRunner { @@ -58,9 +61,11 @@ public class ScheduledTasksRunner { this.prhProperties = prhProperties; } + String profile = System.getenv("SPRING_PROFILES_ACTIVE"); + @EventListener public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { - tryToStartTask(); + tryToStartTask(); } /** @@ -88,5 +93,5 @@ public class ScheduledTasksRunner { return false; } } - } + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java new file mode 100644 index 00000000..4bf49208 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversion.java @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; + +/** + * This class will return start date time of the day and end date time of the day in epoch format. + * @author Mohd Usman Khan on 3/13/23 + */ + +@Component +public class EpochDateTimeConversion { + + private static final Logger LOGGER = LoggerFactory.getLogger(EpochDateTimeConversion.class); + + private String daysForRecords = System.getenv("number_of_days"); + + public Long getStartDateOfTheDay(){ + return getEpochDateTime(atStartOfDay(getCurrentDate())); + } + + public Long getEndDateOfTheDay(){ + return getEpochDateTime(atEndOfDay(getCurrentDate())); + } + + private Long getEpochDateTime(Date date) + { + DateTimeFormatter dtf = DateTimeFormatter.ofPattern("E MMM dd HH:mm:ss zzz yyyy"); + ZonedDateTime zdt = ZonedDateTime.parse( date.toString(),dtf); + return zdt.toInstant().toEpochMilli(); + } + + private Date getCurrentDate() + { + return new java.util.Date(System.currentTimeMillis()); + } + + public Date atStartOfDay(Date date) { + LocalDateTime localDateTime = dateToLocalDateTime(date); + if(daysForRecords==null) + daysForRecords="1"; + LocalDateTime previousDay = localDateTime.minusDays(Integer.parseInt(daysForRecords) - 1l); + LocalDateTime previousStartTime = previousDay.with(LocalTime.MIN); + return localDateTimeToDate(previousStartTime); + } + + private Date atEndOfDay(Date date) { + LocalDateTime localDateTime = dateToLocalDateTime(date); + LocalDateTime endOfDay = localDateTime.with(LocalTime.MAX); + return localDateTimeToDate(endOfDay); + } + + private LocalDateTime dateToLocalDateTime(Date date) { + return LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); + } + + private Date localDateTimeToDate(LocalDateTime localDateTime) { + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } + + public String getDaysForRecords() { + return daysForRecords; + } + + public void setDaysForRecords(String daysForRecords) { + this.daysForRecords = daysForRecords; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java new file mode 100644 index 00000000..4c70c713 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTask.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.springframework.boot.configurationprocessor.json.JSONException; +import reactor.core.publisher.Flux; + +/** + * @author Ajinkya Patil on 3/13/23 + */ + +public interface KafkaConsumerTask { + Flux execute() throws JSONException; + + void commitOffset(); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java new file mode 100644 index 00000000..30e6cff1 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImpl.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.BatchAcknowledgingMessageListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Ajinkya Patil on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@Component +public class KafkaConsumerTaskImpl implements KafkaConsumerTask, BatchAcknowledgingMessageListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTaskImpl.class); + + @Autowired + private DmaapConsumerJsonParser dmaapConsumerJsonParser; + + @Autowired + private EpochDateTimeConversion epochDateTimeConversion; + + private List jsonEvent = new ArrayList<>(); + + private Acknowledgment offset; + + String kafkaTopic = System.getenv("kafkaTopic"); + + String groupIdConfig = System.getenv("groupIdConfig"); + + @Override + @KafkaListener(topics = "${kafkaTopic}", groupId = "${groupIdConfig}") + public void onMessage(List> list, Acknowledgment acknowledgment) { + + if (list != null && !list.isEmpty()) { + + + list.stream().filter(consumerRecord -> consumerRecord.timestamp() >= epochDateTimeConversion.getStartDateOfTheDay() && consumerRecord.timestamp() <= epochDateTimeConversion.getEndDateOfTheDay()) + .map(ConsumerRecord::value) + .forEach(value -> { + jsonEvent.add(value); + }); + + + } + + + offset = acknowledgment; + } + + @Override + public Flux execute() throws JSONException { + return dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent); + } + + @Override + public void commitOffset() { + if(!jsonEvent.isEmpty()){ + jsonEvent.clear(); + } + if(offset != null){ + offset.acknowledge(); + } + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java new file mode 100644 index 00000000..64d7798e --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommit.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcaegen2.services.prh.tasks.commit; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import javax.annotation.PreDestroy; +import org.onap.dcaegen2.services.prh.configuration.PrhProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * @author Pravin Kokane on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@Configuration +@EnableScheduling +public class ScheduledTasksRunnerWithCommit { + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunnerWithCommit.class); + private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); + private static List scheduledPrhTaskFutureList = new ArrayList<>(); + + private final TaskScheduler taskScheduler; + private final PrhProperties prhProperties; + + @Autowired + private ScheduledTasksWithCommit scheduledTasksWithCommit; + + public ScheduledTasksRunnerWithCommit(TaskScheduler taskScheduler, ScheduledTasksWithCommit scheduledTasksWithCommit, + PrhProperties prhProperties) { + this.taskScheduler = taskScheduler; + this.scheduledTasksWithCommit = scheduledTasksWithCommit; + this.prhProperties = prhProperties; + } + + @EventListener + public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { + tryToStartTaskWithCommit(); + } + + /** + * Function which have to stop tasks execution. + */ + @PreDestroy + public synchronized void cancelTasks() { + scheduledPrhTaskFutureList.forEach(x -> x.cancel(false)); + scheduledPrhTaskFutureList.clear(); + } + + /** + * Function for starting scheduling PRH workflow. + * + * @return status of operation execution: true - started, false - not started + */ + + public synchronized boolean tryToStartTaskWithCommit() { + LOGGER.info(ENTRY, "Start scheduling PRH workflow with Commit Tasks Runner"); + if (scheduledPrhTaskFutureList.isEmpty()) { + Collections.synchronizedList(scheduledPrhTaskFutureList); + scheduledPrhTaskFutureList.add(taskScheduler + .scheduleWithFixedDelay(scheduledTasksWithCommit::scheduleKafkaPrhEventTask, + prhProperties.getWorkflowSchedulingInterval())); + return true; + } else { + return false; + } + } + +} + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java new file mode 100644 index 00000000..b0eae949 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommit.java @@ -0,0 +1,213 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks.commit; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; +import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; +import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask; +import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask; +import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask; +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.configurationprocessor.json.JSONException; +import org.springframework.context.annotation.Profile; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Sangeeta Bellara on 3/13/23 + */ +@Profile("autoCommitDisabled") +@Component +public class ScheduledTasksWithCommit { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksWithCommit.class); + private static Boolean pnfFound = true; + private KafkaConsumerTask kafkaConsumerTask; + private DmaapPublisherTask dmaapReadyProducerTask; + private DmaapPublisherTask dmaapUpdateProducerTask; + private AaiQueryTask aaiQueryTask; + private AaiProducerTask aaiProducerTask; + private BbsActionsTask bbsActionsTask; + private Map mdcContextMap; + + /** + * Constructor for tasks registration in PRHWorkflow. + * + * @param kafkaConsumerTask - fist task + * @param dmaapReadyPublisherTask - third task + * @param dmaapUpdatePublisherTask - fourth task + * @param aaiPublisherTask - second task + */ + @Autowired + public ScheduledTasksWithCommit( + final KafkaConsumerTask kafkaConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, + final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, + final Map mdcContextMap) { + this.dmaapReadyProducerTask = dmaapReadyPublisherTask; + this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; + this.kafkaConsumerTask=kafkaConsumerTask; + this.aaiQueryTask = aaiQueryTask; + this.aaiProducerTask = aaiPublisherTask; + this.bbsActionsTask = bbsActionsTask; + this.mdcContextMap = mdcContextMap; + } + + static class State { + public ConsumerDmaapModel dmaapModel; + public Boolean activationStatus; + + public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { + this.dmaapModel = dmaapModel; + this.activationStatus = activationStatus; + } + } + + public void scheduleKafkaPrhEventTask() { + MdcVariables.setMdcContextMap(mdcContextMap); + try { + LOGGER.info("Execution of tasks was registered with commit"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromKafkaMessage() + .flatMap(model->queryAaiForPnf(model) + .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e); + disableCommit(); + }) + .onErrorResume(e -> Mono.empty()) + + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::processAdditionalFields) + .flatMap(this::publishToDmaapConfiguration) + .onErrorResume(e -> Mono.empty()) + + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onCompleteKafka); + mainCountDownLatch.await(); + } catch (InterruptedException | JSONException e ) { + LOGGER.warn("Interruption problem on countDownLatch {}", e); + Thread.currentThread().interrupt(); + } + } + + private static void disableCommit() + { + pnfFound=false; + } + + private void onCompleteKafka() { + LOGGER.info("PRH tasks have been completed"); + if(pnfFound){ + kafkaConsumerTask.commitOffset(); + LOGGER.info("Committed the Offset"); + } + else + { + LOGGER.info("Offset not Committed"); + pnfFound=true; + } + } + + + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + String statusCodeOk = HttpStatus.OK.name(); + MDC.put(RESPONSE_CODE, statusCodeOk); + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk); + MDC.remove(RESPONSE_CODE); + } + } + + private void onError(Throwable throwable) { + if (!(throwable instanceof DmaapEmptyResponseException)) { + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable); + } + } + + private Flux consumeFromKafkaMessage() throws JSONException { + return kafkaConsumerTask.execute(); + } + + private Mono queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { + return aaiQueryTask + .execute(monoDMaaPModel) + .map(x -> new State(monoDMaaPModel, x)); + } + + private Mono queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) { + + LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId()); + return aaiQueryTask.findPnfinAAI(monoDMaaPModel); + } + + + private Mono publishToAaiConfiguration(final State state) { + try { + return aaiProducerTask + .execute(state.dmaapModel) + .map(x -> state); + } catch (PrhTaskException e) { + LOGGER.warn("AAIProducerTask exception has been registered: {}", e); + return Mono.error(e); + } + } + + private Mono processAdditionalFields(final State state) { + if (state.activationStatus) { + LOGGER.debug("Re-registration - Logical links won't be updated."); + return Mono.just(state); + } + return bbsActionsTask.execute(state.dmaapModel).map(x -> state); + } + + private Flux publishToDmaapConfiguration(final State state) { + try { + if (state.activationStatus) { + LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.execute(state.dmaapModel); + } + return dmaapReadyProducerTask.execute(state.dmaapModel); + } catch (PrhTaskException e) { + LOGGER.warn("DMaaPProducerTask exception has been registered: ", e); + return Flux.error(e); + } + } +} -- cgit 1.2.3-korg