diff options
Diffstat (limited to 'cps-ncmp-service/src/main')
28 files changed, 991 insertions, 1085 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java deleted file mode 100644 index ff322ee3cc..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.data.models.DatastoreType; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class CmNotificationSubscriptionDelta { - - private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService; - - /** - * Get the delta for a given predicates list. - * - * @param dmiCmNotificationSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates - * @return delta list of DmiCmNotificationSubscriptionPredicates - */ - public List<DmiCmNotificationSubscriptionPredicate> getDelta( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { - final List<DmiCmNotificationSubscriptionPredicate> delta = new ArrayList<>(); - - for (final DmiCmNotificationSubscriptionPredicate cmNotificationSubscriptionPredicate: - dmiCmNotificationSubscriptionPredicates) { - - final Set<String> targetCmHandleIds = new HashSet<>(); - final Set<String> xpaths = new HashSet<>(); - final DatastoreType datastoreType = cmNotificationSubscriptionPredicate.getDatastoreType(); - - for (final String cmHandleId : cmNotificationSubscriptionPredicate.getTargetCmHandleIds()) { - for (final String xpath : cmNotificationSubscriptionPredicate.getXpaths()) { - if (!cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType, - cmHandleId, xpath)) { - xpaths.add(xpath); - targetCmHandleIds.add(cmHandleId); - - } - } - } - - populateValidDmiCmNotificationSubscriptionPredicateDelta(targetCmHandleIds, xpaths, datastoreType, delta); - } - return delta; - } - - private void populateValidDmiCmNotificationSubscriptionPredicateDelta(final Set<String> targetCmHandleIds, - final Set<String> xpaths, final DatastoreType datastoreType, - final List<DmiCmNotificationSubscriptionPredicate> delta) { - if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) { - final DmiCmNotificationSubscriptionPredicate predicateDelta = - new DmiCmNotificationSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths); - delta.add(predicateDelta); - } - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java deleted file mode 100644 index 50a5df537d..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (c) 2024 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an 'AS IS' BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription; - -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionDmiInEventProducer; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class CmNotificationSubscriptionEventsHandler { - private final CmNotificationSubscriptionNcmpOutEventProducer cmNotificationSubscriptionNcmpOutEventProducer; - private final CmNotificationSubscriptionDmiInEventProducer cmNotificationSubscriptionDmiInEventProducer; - - /** - * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud - * Event compliant. - * - * @param subscriptionId Cm Subscription id - * @param eventType Type of event - * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the - * client - * @param isScheduledEvent Determines if the event is to be scheduled - * or published now - */ - public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent - cmNotificationSubscriptionNcmpOutEvent, - final boolean isScheduledEvent) { - cmNotificationSubscriptionNcmpOutEventProducer.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - eventType, cmNotificationSubscriptionNcmpOutEvent, isScheduledEvent); - } - - /** - * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format. - * - * @param subscriptionId Cm Subscription id - * @param dmiPluginName Dmi Plugin Name - * @param eventType Type of event - * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi - */ - public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName, - final String eventType, - final CmNotificationSubscriptionDmiInEvent - cmNotificationSubscriptionDmiInEvent) { - cmNotificationSubscriptionDmiInEventProducer.publishCmNotificationSubscriptionDmiInEvent(subscriptionId, - dmiPluginName, eventType, cmNotificationSubscriptionDmiInEvent); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java deleted file mode 100644 index 73f9563ecf..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription; - -import java.util.List; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionDmiInEventMapper; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class CmNotificationSubscriptionMappersHandler { - - private final CmNotificationSubscriptionDmiInEventMapper cmNotificationSubscriptionDmiInEventMapper; - private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper; - - /** - * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription. - * - * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates - * @return cm notification subscription dmi in event - */ - public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { - return cmNotificationSubscriptionDmiInEventMapper.toCmNotificationSubscriptionDmiInEvent( - dmiCmNotificationSubscriptionPredicates); - } - - /** - * Mapper to form a response for the client for the Cm Notification Subscription. - * - * @param subscriptionId Cm Notification Subscription id - * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin - * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client - */ - public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) { - return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - dmiCmNotificationSubscriptionDetailsMap); - } - - /** - * Mapper to form a rejected response for the client for the Cm Notification Subscription Request. - * - * @param subscriptionId subscription id - * @param rejectedTargetFilters list of rejected target filters for the subscription request - * @return to sent back to the client - */ - public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest( - final String subscriptionId, final List<String> rejectedTargetFilters) { - return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest( - subscriptionId, rejectedTargetFilters); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java deleted file mode 100644 index f7dd51e637..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription; - -import static org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer.buildAndGetCmNotificationNcmpOutEventAsCloudEvent; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.onap.cps.utils.JsonObjectMapper; - -@Slf4j -@RequiredArgsConstructor -public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Runnable { - - private final String topicName; - private final String subscriptionId; - private final String eventType; - private final EventsPublisher<CloudEvent> eventsPublisher; - private final JsonObjectMapper jsonObjectMapper; - private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler; - private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; - - /** - * Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will - * be called after a specified delay. - */ - @Override - public void run() { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap = - dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = - cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - dmiCmNotificationSubscriptionDetailsMap); - eventsPublisher.publishCloudEvent(topicName, subscriptionId, - buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, - cmNotificationSubscriptionNcmpOutEvent)); - dmiCmNotificationSubscriptionCacheHandler - .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java deleted file mode 100644 index 978a4cdfe2..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer; - -import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED; -import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED; -import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; - -import io.cloudevents.CloudEvent; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.NcmpResponseStatus; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class CmNotificationSubscriptionDmiOutEventConsumer { - - private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; - private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler; - private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler; - - private static final String CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#"; - - /** - * Consume the Cm Notification Subscription event from the dmi-plugin. - * - * @param cmNotificationSubscriptionDmiOutEventConsumerRecord the event to be consumed - */ - @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeCmNotificationSubscriptionDmiOutEvent( - final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiOutEventConsumerRecord) { - final CloudEvent cloudEvent = cmNotificationSubscriptionDmiOutEventConsumerRecord.value(); - final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent = - toTargetEvent(cloudEvent, CmNotificationSubscriptionDmiOutEvent.class); - final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid")); - if (cmNotificationSubscriptionDmiOutEvent != null && correlationId != null) { - final String eventType = cloudEvent.getType(); - handleCmSubscriptionDmiOutEvent(correlationId, eventType, cmNotificationSubscriptionDmiOutEvent); - } - } - - private void handleCmSubscriptionDmiOutEvent(final String correlationId, - final String eventType, - final CmNotificationSubscriptionDmiOutEvent - cmNotificationSubscriptionDmiOutEvent) { - final String subscriptionId = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0]; - final String dmiPluginName = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1]; - - if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, cmNotificationSubscriptionDmiOutEvent.getData())) { - handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED); - if (eventType.equals("subscriptionCreateResponse")) { - dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); - } - if (eventType.equals("subscriptionDeleteResponse")) { - dmiCmNotificationSubscriptionCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName); - } - handleEventsStatusPerDmi(subscriptionId, eventType); - } - - if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, cmNotificationSubscriptionDmiOutEvent.getData())) { - handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED); - handleEventsStatusPerDmi(subscriptionId, eventType); - } - - log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId, - dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage()); - } - - private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName, - final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) { - dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId, - dmiPluginName, cmNotificationSubscriptionStatus); - } - - private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi = - dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = - cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - dmiCmNotificationSubscriptionDetailsPerDmi); - cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - eventType, cmNotificationSubscriptionNcmpOutEvent, false); - } - - private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus, - final Data cmNotificationSubscriptionDmiOutData) { - return ncmpResponseStatus.getCode().equals(cmNotificationSubscriptionDmiOutData.getStatusCode()) - && ncmpResponseStatus.getMessage() - .equals(cmNotificationSubscriptionDmiOutData.getStatusMessage()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java deleted file mode 100644 index ea21751691..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class CmNotificationSubscriptionNcmpOutEventMapper { - - /** - * Mapper to form a response for the client for the Cm Notification Subscription. - * - * @param subscriptionId Cm Notification Subscription Id - * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin - * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client - */ - public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) { - - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = - new CmNotificationSubscriptionNcmpOutEvent(); - final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setSubscriptionId(subscriptionId); - populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds(dmiCmNotificationSubscriptionDetailsMap, - cmSubscriptionData); - cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData); - - return cmNotificationSubscriptionNcmpOutEvent; - } - - /** - * Mapper to form a rejected response for the client for the Cm Notification Subscription Request. - * - * @param subscriptionId subscription id - * @param rejectedTargetFilters list of rejected target filters for the subscription request - * @return to sent back to the client - */ - public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest( - final String subscriptionId, final List<String> rejectedTargetFilters) { - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = - new CmNotificationSubscriptionNcmpOutEvent(); - final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setSubscriptionId(subscriptionId); - cmSubscriptionData.setRejectedTargets(rejectedTargetFilters); - cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData); - return cmNotificationSubscriptionNcmpOutEvent; - } - - private void populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds( - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap, - final Data cmSubscriptionData) { - - final List<String> acceptedCmHandleIds = new ArrayList<>(); - final List<String> pendingCmHandleIds = new ArrayList<>(); - final List<String> rejectedCmHandleIds = new ArrayList<>(); - - dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> { - final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus = - dmiCmNotificationSubscriptionDetails.getCmNotificationSubscriptionStatus(); - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates = - dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates(); - - switch (cmNotificationSubscriptionStatus) { - case ACCEPTED -> acceptedCmHandleIds.addAll( - extractCmHandleIds(dmiCmNotificationSubscriptionPredicates)); - case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates)); - default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates)); - } - }); - - cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds); - cmSubscriptionData.setPendingTargets(pendingCmHandleIds); - cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds); - - } - - private List<String> extractCmHandleIds( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { - final List<String> cmHandleIds = new ArrayList<>(); - dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll( - dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds())); - - return cmHandleIds; - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java deleted file mode 100644 index ed7ed2a0ba..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer; - -import io.cloudevents.CloudEvent; -import io.cloudevents.core.builder.CloudEventBuilder; -import java.net.URI; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsPublisher; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionNcmpOutEventPublishingTask; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.onap.cps.utils.JsonObjectMapper; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class CmNotificationSubscriptionNcmpOutEventProducer { - - @Value("${app.ncmp.avc.cm-subscription-ncmp-out}") - private String cmNotificationSubscriptionNcmpOutEventTopic; - - @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}") - private Integer cmNotificationSubscriptionDmiOutEventTimeoutInMs; - - private final EventsPublisher<CloudEvent> eventsPublisher; - private final JsonObjectMapper jsonObjectMapper; - private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler; - private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>(); - - /** - * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud - * Event compliant. - * - * @param subscriptionId Cm Subscription Id - * @param eventType Type of event - * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the - * client - * @param isScheduledEvent Determines if the event is to be scheduled - * or published now - */ - public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent - cmNotificationSubscriptionNcmpOutEvent, - final boolean isScheduledEvent) { - - if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) { - final ScheduledFuture<?> scheduledFuture = - scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType); - scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture); - log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId); - } else { - cancelScheduledTaskForSubscriptionId(subscriptionId); - publishCmNotificationSubscriptionNcmpOutEventNow(subscriptionId, eventType, - cmNotificationSubscriptionNcmpOutEvent); - log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId); - } - } - - private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, - final String eventType) { - final CmNotificationSubscriptionNcmpOutEventPublishingTask - cmNotificationSubscriptionNcmpOutEventPublishingTask = - new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic, - subscriptionId, eventType, eventsPublisher, jsonObjectMapper, - cmNotificationSubscriptionMappersHandler, dmiCmNotificationSubscriptionCacheHandler); - return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask, - cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS); - } - - private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) { - - final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId); - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - scheduledTasksPerSubscriptionId.remove(subscriptionId); - } - - } - - - private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent - cmNotificationSubscriptionNcmpOutEvent) { - final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent = - buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, - cmNotificationSubscriptionNcmpOutEvent); - eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId, - cmNotificationSubscriptionNcmpOutEventAsCloudEvent); - dmiCmNotificationSubscriptionCacheHandler - .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId); - } - - /** - * Get an NCMP out event as cloud event. - * - * @param jsonObjectMapper JSON object mapper - * @param subscriptionId subscription id - * @param eventType event type - * @param cmNotificationSubscriptionNcmpOutEvent cm notification subscription NCMP out event - * @return cm notification subscription NCMP out event as cloud event - */ - public static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent( - final JsonObjectMapper jsonObjectMapper, final String subscriptionId, final String eventType, - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) { - - return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType) - .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) - .withExtension("correlationid", subscriptionId) - .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build(); - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java deleted file mode 100644 index 08e3c95529..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionDelta; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; -import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent; -import org.springframework.stereotype.Service; - -@Service -@RequiredArgsConstructor -public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificationSubscriptionHandlerService { - - private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService; - private final CmNotificationSubscriptionDelta cmNotificationSubscriptionDelta; - private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler; - private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler; - private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler; - - @Override - public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) { - if (cmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) { - dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates); - handleCmNotificationSubscriptionDelta(subscriptionId); - scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, - "subscriptionCreateResponse"); - } else { - rejectAndPublishCmNotificationSubscriptionCreateRequest(subscriptionId, predicates); - } - } - - @Override - public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) { - dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates); - sendSubscriptionDeleteRequestToDmi(subscriptionId); - scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); - } - - private void scheduleCmNotificationSubscriptionNcmpOutEventResponse(final String subscriptionId, - final String eventType) { - cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - eventType, null, true); - } - - private void rejectAndPublishCmNotificationSubscriptionCreateRequest(final String subscriptionId, - final List<Predicate> predicates) { - final Set<String> subscriptionTargetFilters = - predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream()) - .collect(Collectors.toSet()); - final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent = - cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest( - subscriptionId, new ArrayList<>(subscriptionTargetFilters)); - cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, - "subscriptionCreateResponse", cmNotificationSubscriptionNcmpOutEvent, false); - } - - private void handleCmNotificationSubscriptionDelta(final String subscriptionId) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap = - dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); - dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> { - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates = - cmNotificationSubscriptionDelta.getDelta( - dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates()); - - if (dmiCmNotificationSubscriptionPredicates.isEmpty()) { - acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(subscriptionId, dmiPluginName); - } else { - publishCmNotificationSubscriptionDmiInEventPerDmi(subscriptionId, dmiPluginName, - dmiCmNotificationSubscriptionPredicates); - } - }); - } - - private void publishCmNotificationSubscriptionDmiInEventPerDmi(final String subscriptionId, - final String dmiPluginName, - final List<DmiCmNotificationSubscriptionPredicate> - dmiCmNotificationSubscriptionPredicates) { - final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent = - cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent( - dmiCmNotificationSubscriptionPredicates); - cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId, - dmiPluginName, "subscriptionCreateRequest", cmNotificationSubscriptionDmiInEvent); - } - - private void acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(final String subscriptionId, - final String dmiPluginName) { - dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId, - dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED); - dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); - } - - private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap = - dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId); - dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> { - final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent = - cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent( - dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates()); - cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId, - dmiPluginName, "subscriptionDeleteRequest", cmNotificationSubscriptionDmiInEvent); - }); - } -}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java deleted file mode 100644 index d87624c23c..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; - -import java.util.Collection; -import org.onap.cps.ncmp.api.data.models.DatastoreType; - -public interface CmNotificationSubscriptionPersistenceService { - - String NCMP_DATASPACE_NAME = "NCMP-Admin"; - String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions"; - - /** - * Check if we have an ongoing cm subscription based on the parameters. - * - * @param datastoreType the susbcription target datastore type - * @param cmHandleId the id of the cm handle for the susbcription - * @param xpath the target xpath - * @return true for ongoing cmsubscription , otherwise false - */ - boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath); - - /** - * Check if the subscription ID is unique against ongoing subscriptions. - * - * @param subscriptionId subscription ID - * @return true if subscriptionId is not used in active subscriptions, otherwise false - */ - boolean isUniqueSubscriptionId(final String subscriptionId); - - /** - * Get all ongoing cm notification subscription based on the parameters. - * - * @param datastoreType the susbcription target datastore type - * @param cmHandleId the id of the cm handle for the susbcription - * @param xpath the target xpath - * @return collection of subscription ids of ongoing cm notification subscription - */ - Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType, - final String cmHandleId, final String xpath); - - /** - * Add cm notification subscription. - * - * @param datastoreType the susbcription target datastore type - * @param cmHandleId the id of the cm handle for the susbcription - * @param xpath the target xpath - * @param newSubscriptionId subscription id to be added - */ - void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, final String newSubscriptionId); - - /** - * Remove cm notification Subscription. - * - * @param datastoreType the susbcription target datastore type - * @param cmHandleId the id of the cm handle for the susbcription - * @param xpath the target xpath - * @param subscriptionId subscription id to remove - */ - void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, final String subscriptionId); - -} - diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java new file mode 100644 index 0000000000..fbe21267d9 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2024 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription; + +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class EventsFacade { + private final NcmpOutEventProducer ncmpOutEventProducer; + private final DmiInEventProducer dmiInEventProducer; + + /** + * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud + * Event compliant. + * + * @param subscriptionId Cm Subscription id + * @param eventType Type of event + * @param ncmpOutEvent Cm Notification Subscription Event for the + * client + * @param isScheduledEvent Determines if the event is to be scheduled + * or published now + */ + public void publishNcmpOutEvent(final String subscriptionId, final String eventType, + final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { + ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, isScheduledEvent); + } + + /** + * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format. + * + * @param subscriptionId Cm Subscription id + * @param dmiPluginName Dmi Plugin Name + * @param eventType Type of event + * @param dmiInEvent Cm Notification Subscription event for Dmi + */ + public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName, + final String eventType, final DmiInEvent dmiInEvent) { + dmiInEventProducer.publishDmiInEvent(subscriptionId, + dmiPluginName, eventType, dmiInEvent); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java new file mode 100644 index 0000000000..e79b4e6441 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription; + +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class MappersFacade { + + private final DmiInEventMapper dmiInEventMapper; + private final NcmpOutEventMapper ncmpOutEventMapper; + + /** + * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription. + * + * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates + * @return cm notification subscription dmi in event + */ + public DmiInEvent toDmiInEvent( + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + return dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates); + } + + /** + * Mapper to form a response for the client for the Cm Notification Subscription. + * + * @param subscriptionId Cm Notification Subscription id + * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin + * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client + */ + public NcmpOutEvent toNcmpOutEvent(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) { + return ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, + dmiSubscriptionsPerDmi); + } + + /** + * Mapper to form a rejected response for the client for the Cm Notification Subscription Request. + * + * @param subscriptionId subscription id + * @param rejectedTargetFilters list of rejected target filters for the subscription request + * @return to sent back to the client + */ + public NcmpOutEvent toNcmpOutEventForRejectedRequest( + final String subscriptionId, final List<String> rejectedTargetFilters) { + return ncmpOutEventMapper.toNcmpOutEventForRejectedRequest( + subscriptionId, rejectedTargetFilters); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java index 1d6da90a9a..a4f9be357f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java @@ -18,18 +18,18 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.config.embeddedcache; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache; import com.hazelcast.config.MapConfig; import com.hazelcast.map.IMap; import java.util.Map; import org.onap.cps.cache.HazelcastCacheConfig; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration -public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig { +public class CmSubscriptionConfig extends HazelcastCacheConfig { private static final MapConfig cmNotificationSubscriptionCacheMapConfig = createMapConfig("cmNotificationSubscriptionCacheMapConfig"); @@ -42,7 +42,7 @@ public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig * @return configured map of subscription events. */ @Bean - public IMap<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache() { + public IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache() { return createHazelcastInstance("hazelCastInstanceCmNotificationSubscription", cmNotificationSubscriptionCacheMapConfig).getMap("cmNotificationSubscriptionCache"); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java index 840ab0fb92..c5052f1405 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java @@ -18,9 +18,9 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache; -import static org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus.PENDING; +import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING; import java.util.ArrayList; import java.util.Collection; @@ -32,21 +32,21 @@ import java.util.Set; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.onap.cps.ncmp.api.data.models.DatastoreType; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor -public class DmiCmNotificationSubscriptionCacheHandler { +public class DmiCacheHandler { - private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService; - private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache; + private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService; + private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache; private final InventoryPersistence inventoryPersistence; /** @@ -56,7 +56,7 @@ public class DmiCmNotificationSubscriptionCacheHandler { * @param predicates subscription request predicates */ public void add(final String subscriptionId, final List<Predicate> predicates) { - cmNotificationSubscriptionCache.put(subscriptionId, createDmiCmNotificationSubscriptionsPerDmi(predicates)); + cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates)); } /** @@ -65,7 +65,7 @@ public class DmiCmNotificationSubscriptionCacheHandler { * @param subscriptionId subscription id * @return map of dmi cm notification subscriptions per dmi */ - public Map<String, DmiCmNotificationSubscriptionDetails> get(final String subscriptionId) { + public Map<String, DmiCmSubscriptionDetails> get(final String subscriptionId) { return cmNotificationSubscriptionCache.get(subscriptionId); } @@ -75,15 +75,15 @@ public class DmiCmNotificationSubscriptionCacheHandler { * * @param subscriptionId subscription id as key in CM notification Subscription cache. */ - public void removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(final String subscriptionId) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionsPerDmi = + public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = cmNotificationSubscriptionCache.get(subscriptionId); - final Map<String, DmiCmNotificationSubscriptionDetails> updatedDmiCmNotificationSubscriptionsPerDmi = - dmiCmNotificationSubscriptionsPerDmi.entrySet().stream().filter( - dmiCmNotificationSubscription -> - !isAcceptedOrRejected(dmiCmNotificationSubscription.getValue())) + final Map<String, DmiCmSubscriptionDetails> updatedDmiSubscriptionsPerDmi = + dmiSubscriptionsPerDmi.entrySet().stream() + .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected( + dmiCmNotificationSubscription.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiCmNotificationSubscriptionsPerDmi); + cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi); } /** @@ -92,9 +92,9 @@ public class DmiCmNotificationSubscriptionCacheHandler { * @param predicates CM Subscription Create Request Predicates * @return Map of DmiCmNotificationSubscription per DMI plugin */ - public Map<String, DmiCmNotificationSubscriptionDetails> createDmiCmNotificationSubscriptionsPerDmi( + public Map<String, DmiCmSubscriptionDetails> createDmiSubscriptionsPerDmi( final List<Predicate> predicates) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi = + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = new HashMap<>(); for (final Predicate requestPredicate : predicates) { final List<String> targetFilter = requestPredicate.getTargetFilter(); @@ -103,15 +103,15 @@ public class DmiCmNotificationSubscriptionCacheHandler { final Set<String> xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter()); final Map<String, Set<String>> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter); for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi: targetCmHandlesByDmiMap.entrySet()) { - final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate = - new DmiCmNotificationSubscriptionPredicate(targetCmHandlesByDmi.getValue(), + final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate = + new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(), datastoreType, xpaths); - updateDmiCmNotificationSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(), - dmiCmNotificationSubscriptionPredicate, - dmiCmNotificationSubscriptionDetailsPerDmi); + updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(), + dmiCmSubscriptionPredicate, + dmiSubscriptionsPerDmi); } } - return dmiCmNotificationSubscriptionDetailsPerDmi; + return dmiSubscriptionsPerDmi; } /** @@ -122,13 +122,12 @@ public class DmiCmNotificationSubscriptionCacheHandler { * @param status String of status * */ - public void updateDmiCmNotificationSubscriptionStatusPerDmi(final String subscriptionId, - final String dmiServiceName, - final CmNotificationSubscriptionStatus status) { - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi = + public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName, + final CmSubscriptionStatus status) { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = cmNotificationSubscriptionCache.get(subscriptionId); - dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName).setCmNotificationSubscriptionStatus(status); - cmNotificationSubscriptionCache.put(subscriptionId, dmiCmNotificationSubscriptionDetailsPerDmi); + dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status); + cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi); } /** @@ -139,18 +138,17 @@ public class DmiCmNotificationSubscriptionCacheHandler { * */ public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList = + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .getDmiCmNotificationSubscriptionPredicates(); - for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate: - dmiCmNotificationSubscriptionPredicateList) { - final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType(); - final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds(); - final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths(); + .getDmiCmSubscriptionPredicates(); + for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) { + final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); + final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds(); + final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths(); for (final String cmHandle: cmHandles) { for (final String xpath: xpaths) { - cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle, + cmSubscriptionPersistenceService.addCmSubscription(datastoreType, cmHandle, xpath, subscriptionId); } } @@ -165,35 +163,34 @@ public class DmiCmNotificationSubscriptionCacheHandler { * */ public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) { - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList = + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName) - .getDmiCmNotificationSubscriptionPredicates(); - for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate: - dmiCmNotificationSubscriptionPredicateList) { - final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType(); - final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds(); - final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths(); + .getDmiCmSubscriptionPredicates(); + for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) { + final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); + final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds(); + final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths(); for (final String cmHandle: cmHandles) { for (final String xpath: xpaths) { - cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType, + cmSubscriptionPersistenceService.removeCmSubscription(datastoreType, cmHandle, xpath, subscriptionId); } } } } - private void updateDmiCmNotificationSubscriptionDetailsPerDmi( + private void updateDmiSubscriptionDetailsPerDmi( final String dmiServiceName, - final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate, - final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi) { - if (dmiCmNotificationSubscriptionDetailsPerDmi.containsKey(dmiServiceName)) { - dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName) - .getDmiCmNotificationSubscriptionPredicates().add(dmiCmNotificationSubscriptionPredicate); + final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate, + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) { + if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) { + dmiSubscriptionsPerDmi.get(dmiServiceName) + .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate); } else { - dmiCmNotificationSubscriptionDetailsPerDmi.put(dmiServiceName, - new DmiCmNotificationSubscriptionDetails( - new ArrayList<>(List.of(dmiCmNotificationSubscriptionPredicate)), + dmiSubscriptionsPerDmi.put(dmiServiceName, + new DmiCmSubscriptionDetails( + new ArrayList<>(List.of(dmiCmSubscriptionPredicate)), PENDING)); } } @@ -211,9 +208,8 @@ public class DmiCmNotificationSubscriptionCacheHandler { return targetCmHandlesByDmiServiceNames; } - private boolean isAcceptedOrRejected( - final DmiCmNotificationSubscriptionDetails dmiCmNotificationSubscription) { - return dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("ACCEPTED") - || dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("REJECTED"); + private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) { + return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED") + || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED"); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java index f635f1a80b..0207fb90e3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.avc; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -33,13 +33,13 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for AVC events. + * Listener for AVC events based on Cm Subscriptions. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class AvcEventConsumer { +public class CmAvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") @@ -50,15 +50,17 @@ public class AvcEventConsumer { /** * Incoming AvcEvent in the form of Consumer Record. * - * @param avcEventConsumerRecord Incoming raw consumer record + * @param cmAvcEventAsConsumerRecord Incoming raw consumer record */ @KafkaListener(topics = "${app.dmi.cm-events.topic}", containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) { - log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); + public void consumeAndForward( + final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) { + log.debug("Consuming AVC event {} ...", cmAvcEventAsConsumerRecord.value()); final String newEventId = UUID.randomUUID().toString(); final CloudEvent outgoingAvcEvent = - CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + CloudEventBuilder.from(cmAvcEventAsConsumerRecord.value()).withId(newEventId) + .build(); eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java index 7263891a21..4ce4ef36cf 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi; import java.util.ArrayList; import java.util.HashSet; @@ -27,46 +27,44 @@ import java.util.List; import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmHandle; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Data; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Predicate; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.ScopeFilter; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.ScopeFilter; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor -public class CmNotificationSubscriptionDmiInEventMapper { +public class DmiInEventMapper { private final InventoryPersistence inventoryPersistence; /** * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription. * - * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates - * @return CmNotificationSubscriptionDmiInEvent to be sent to DMI Plugin + * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates + * @return DmiInEvent to be sent to DMI Plugin */ - public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { - final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent = - new CmNotificationSubscriptionDmiInEvent(); + public DmiInEvent toDmiInEvent(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + final DmiInEvent dmiInEvent = new DmiInEvent(); final Data cmSubscriptionData = new Data(); - cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmNotificationSubscriptionPredicates)); - cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmhandleWithPrivateProperties( - extractUniqueCmHandleIds(dmiCmNotificationSubscriptionPredicates))); - cmNotificationSubscriptionDmiInEvent.setData(cmSubscriptionData); - return cmNotificationSubscriptionDmiInEvent; + cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates)); + cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithPrivateProperties( + extractUniqueCmHandleIds(dmiCmSubscriptionPredicates))); + dmiInEvent.setData(cmSubscriptionData); + return dmiInEvent; } private List<Predicate> mapToDmiInEventPredicates( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { final List<Predicate> predicates = new ArrayList<>(); - dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> { + dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> { final Predicate predicate = new Predicate(); final ScopeFilter scopeFilter = new ScopeFilter(); scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue( @@ -81,7 +79,7 @@ public class CmNotificationSubscriptionDmiInEventMapper { } - private List<CmHandle> mapToCmSubscriptionCmhandleWithPrivateProperties(final Set<String> cmHandleIds) { + private List<CmHandle> mapToCmSubscriptionCmHandleWithPrivateProperties(final Set<String> cmHandleIds) { final List<CmHandle> cmSubscriptionCmHandles = new ArrayList<>(); @@ -99,11 +97,10 @@ public class CmNotificationSubscriptionDmiInEventMapper { } - private Set<String> extractUniqueCmHandleIds( - final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) { + private Set<String> extractUniqueCmHandleIds(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { final Set<String> cmHandleIds = new HashSet<>(); - dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll( + dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll( dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds())); return cmHandleIds; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java index 3273c556c6..c62916f05c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -26,7 +26,7 @@ import java.net.URI; import java.util.UUID; import lombok.RequiredArgsConstructor; import org.onap.cps.events.EventsPublisher; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -35,38 +35,36 @@ import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class CmNotificationSubscriptionDmiInEventProducer { +public class DmiInEventProducer { private final EventsPublisher<CloudEvent> eventsPublisher; private final JsonObjectMapper jsonObjectMapper; @Value("${app.ncmp.avc.cm-subscription-dmi-in}") - private String cmNotificationSubscriptionDmiInEventTopic; + private String dmiInEventTopic; /** * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format. * - * @param subscriptionId Cm Subscription Id - * @param dmiPluginName Dmi Plugin Name - * @param eventType Type of event - * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi + * @param subscriptionId Cm Subscription Id + * @param dmiPluginName Dmi Plugin Name + * @param eventType Type of event + * @param dmiInEvent Cm Notification Subscription event for Dmi */ - public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName, - final String eventType, final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) { - eventsPublisher.publishCloudEvent(cmNotificationSubscriptionDmiInEventTopic, subscriptionId, - buildAndGetCmNotificationDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, - cmNotificationSubscriptionDmiInEvent)); + public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName, + final String eventType, final DmiInEvent dmiInEvent) { + eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId, + buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)); } - private CloudEvent buildAndGetCmNotificationDmiInEventAsCloudEvent(final String subscriptionId, - final String dmiPluginName, final String eventType, - final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) { + private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId, + final String dmiPluginName, final String eventType, final DmiInEvent dmiInEvent) { return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType) .withSource(URI.create("NCMP")) .withDataSchema(URI.create("org.onap.ncmp.dmi.cm.subscription:1.0.0")) .withExtension("correlationid", subscriptionId.concat("#").concat(dmiPluginName)) - .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionDmiInEvent)).build(); + .withData(jsonObjectMapper.asJsonBytes(dmiInEvent)).build(); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java new file mode 100644 index 0000000000..2a45818624 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi; + +import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED; +import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED; +import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.NcmpResponseStatus; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class DmiOutEventConsumer { + + private final DmiCacheHandler dmiCacheHandler; + private final EventsFacade eventsFacade; + private final MappersFacade mappersFacade; + + private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#"; + + /** + * Consume the Cm Notification Subscription event from the dmi-plugin. + * + * @param dmiOutEventAsConsumerRecord the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) { + final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value(); + final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class); + final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid")); + if (dmiOutEvent != null && correlationId != null) { + final String eventType = cloudEvent.getType(); + handleDmiOutEvent(correlationId, eventType, dmiOutEvent); + } + } + + private void handleDmiOutEvent(final String correlationId, final String eventType, + final DmiOutEvent dmiOutEvent) { + final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0]; + final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1]; + + if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) { + handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED); + if (eventType.equals("subscriptionCreateResponse")) { + dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); + } + if (eventType.equals("subscriptionDeleteResponse")) { + dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName); + } + handleEventsStatusPerDmi(subscriptionId, eventType); + } + + if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) { + handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED); + handleEventsStatusPerDmi(subscriptionId, eventType); + } + + log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId, + dmiPluginName, dmiOutEvent.getData().getStatusMessage()); + } + + private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName, + final CmSubscriptionStatus cmSubscriptionStatus) { + dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + cmSubscriptionStatus); + } + + private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = + dmiCacheHandler.get(subscriptionId); + final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId, + dmiSubscriptionsPerDmi); + eventsFacade.publishNcmpOutEvent(subscriptionId, eventType, + ncmpOutEvent, false); + } + + private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus, + final Data dmiOutData) { + return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode()) + && ncmpResponseStatus.getMessage() + .equals(dmiOutData.getStatusMessage()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java index 68d54fac95..5b7c46ed00 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java @@ -18,15 +18,15 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.model; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.models; -public enum CmNotificationSubscriptionStatus { +public enum CmSubscriptionStatus { ACCEPTED("ACCEPTED"), REJECTED("REJECTED"), PENDING("PENDING"); private final String cmNotificationSubscriptionStatusValue; - CmNotificationSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) { + CmSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) { this.cmNotificationSubscriptionStatusValue = cmNotificationSubscriptionStatusValue; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java index 95757e7240..dbc607ad27 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.model; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.models; import java.util.List; import lombok.AllArgsConstructor; @@ -28,8 +28,8 @@ import lombok.Setter; @Getter @Setter @AllArgsConstructor -public class DmiCmNotificationSubscriptionDetails { +public class DmiCmSubscriptionDetails { - private List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates; - private CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus; + private List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates; + private CmSubscriptionStatus cmSubscriptionStatus; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java index 40c0188fa0..84d3aead8c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.model; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.models; import java.util.Set; import lombok.AllArgsConstructor; @@ -29,7 +29,7 @@ import org.onap.cps.ncmp.api.data.models.DatastoreType; @Getter @Setter @AllArgsConstructor -public class DmiCmNotificationSubscriptionPredicate { +public class DmiCmSubscriptionPredicate { private Set<String> targetCmHandleIds; private DatastoreType datastoreType; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java new file mode 100644 index 0000000000..d7f15a2c72 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.api.data.models.DatastoreType; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class CmSubscriptionComparator { + + private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService; + + /** + * Get the new Dmi Predicates for a given predicates list. + * + * @param existingDmiCmSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates + * @return new list of DmiCmNotificationSubscriptionPredicates + */ + public List<DmiCmSubscriptionPredicate> getNewDmiSubscriptionPredicates( + final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) { + final List<DmiCmSubscriptionPredicate> newDmiCmSubscriptionPredicates = + new ArrayList<>(); + + for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) { + + final Set<String> targetCmHandleIds = new HashSet<>(); + final Set<String> xpaths = new HashSet<>(); + final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType(); + + for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) { + for (final String xpath : dmiCmSubscriptionPredicate.getXpaths()) { + if (!cmSubscriptionPersistenceService.isOngoingCmSubscription(datastoreType, + cmHandleId, xpath)) { + xpaths.add(xpath); + targetCmHandleIds.add(cmHandleId); + + } + } + } + + populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType, + newDmiCmSubscriptionPredicates); + } + return newDmiCmSubscriptionPredicates; + } + + private void populateValidDmiSubscriptionPredicates(final Set<String> targetCmHandleIds, + final Set<String> xpaths, final DatastoreType datastoreType, + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) { + final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate = + new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths); + dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate); + } + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java index 1c52ffa798..3a9b2066b2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java @@ -18,12 +18,12 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; import java.util.List; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; -public interface CmNotificationSubscriptionHandlerService { +public interface CmSubscriptionHandler { /** * Process cm notification subscription create request. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java new file mode 100644 index 0000000000..e225b705d7 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java @@ -0,0 +1,121 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { + + private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService; + private final CmSubscriptionComparator cmSubscriptionComparator; + private final MappersFacade mappersFacade; + private final EventsFacade eventsFacade; + private final DmiCacheHandler dmiCacheHandler; + + @Override + public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) { + if (cmSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) { + dmiCacheHandler.add(subscriptionId, predicates); + handleNewCmSubscription(subscriptionId); + scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse"); + } else { + rejectAndPublishCreateRequest(subscriptionId, predicates); + } + } + + @Override + public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) { + dmiCacheHandler.add(subscriptionId, predicates); + sendSubscriptionDeleteRequestToDmi(subscriptionId); + scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse"); + } + + private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) { + eventsFacade.publishNcmpOutEvent(subscriptionId, eventType, null, true); + } + + private void rejectAndPublishCreateRequest(final String subscriptionId, final List<Predicate> predicates) { + final Set<String> subscriptionTargetFilters = + predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream()) + .collect(Collectors.toSet()); + final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEventForRejectedRequest(subscriptionId, + new ArrayList<>(subscriptionTargetFilters)); + eventsFacade.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false); + } + + private void handleNewCmSubscription(final String subscriptionId) { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = + dmiCacheHandler.get(subscriptionId); + dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = + cmSubscriptionComparator.getNewDmiSubscriptionPredicates( + dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); + + if (dmiCmSubscriptionPredicates.isEmpty()) { + acceptAndPublishNcmpOutEventPerDmi(subscriptionId, dmiPluginName); + } else { + publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates); + } + }); + } + + private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName, + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent(dmiCmSubscriptionPredicates); + eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName, + "subscriptionCreateRequest", dmiInEvent); + } + + private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) { + dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName, + CmSubscriptionStatus.ACCEPTED); + dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName); + } + + private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = + dmiCacheHandler.get(subscriptionId); + dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { + final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent( + dmiSubscriptionDetails.getDmiCmSubscriptionPredicates()); + eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName, + "subscriptionDeleteRequest", dmiInEvent); + }); + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java index 65f4ee8c89..1e1359dd0d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent; @@ -27,44 +27,43 @@ import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionHandlerService; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent; -import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component @Slf4j @RequiredArgsConstructor -public class CmNotificationSubscriptionNcmpInEventConsumer { +public class NcmpInEventConsumer { - private final CmNotificationSubscriptionHandlerService cmNotificationSubscriptionHandlerService; + private final CmSubscriptionHandler cmSubscriptionHandler; /** * Consume the specified event. * - * @param subscriptionEventConsumerRecord the event to be consumed + * @param ncmpInEventAsConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}", containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) { - final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); - final CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent = - toTargetEvent(cloudEvent, CmNotificationSubscriptionNcmpInEvent.class); + public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> ncmpInEventAsConsumerRecord) { + final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value(); + final NcmpInEvent ncmpInEvent = + toTargetEvent(cloudEvent, NcmpInEvent.class); log.info("Subscription with name {} to be mapped to hazelcast object...", - cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()); + ncmpInEvent.getData().getSubscriptionId()); - final String subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId(); - final List<Predicate> predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates(); + final String subscriptionId = ncmpInEvent.getData().getSubscriptionId(); + final List<Predicate> predicates = ncmpInEvent.getData().getPredicates(); if ("subscriptionCreateRequest".equals(cloudEvent.getType())) { log.info("Subscription create request for source {} with subscription id {} ...", cloudEvent.getSource(), subscriptionId); - cmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest(subscriptionId, predicates); + cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates); } if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) { log.info("Subscription delete request for source {} with subscription id {} ...", cloudEvent.getSource(), subscriptionId); - cmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest(subscriptionId, predicates); + cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java new file mode 100644 index 0000000000..ffd4b014fb --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class NcmpOutEventMapper { + + /** + * Mapper to form a response for the client for the Cm Notification Subscription. + * + * @param subscriptionId Cm Notification Subscription Id + * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin + * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client + */ + public NcmpOutEvent toNcmpOutEvent(final String subscriptionId, + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) { + + final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent(); + final Data cmSubscriptionData = new Data(); + cmSubscriptionData.setSubscriptionId(subscriptionId); + populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi, + cmSubscriptionData); + ncmpOutEvent.setData(cmSubscriptionData); + + return ncmpOutEvent; + } + + /** + * Mapper to form a rejected response for the client for the Cm Notification Subscription Request. + * + * @param subscriptionId subscription id + * @param rejectedTargetFilters list of rejected target filters for the subscription request + * @return to sent back to the client + */ + public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId, + final List<String> rejectedTargetFilters) { + final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent(); + final Data cmSubscriptionData = new Data(); + cmSubscriptionData.setSubscriptionId(subscriptionId); + cmSubscriptionData.setRejectedTargets(rejectedTargetFilters); + ncmpOutEvent.setData(cmSubscriptionData); + return ncmpOutEvent; + } + + private void populateNcmpOutEventWithCmHandleIds( + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi, + final Data cmSubscriptionData) { + + final List<String> acceptedCmHandleIds = new ArrayList<>(); + final List<String> pendingCmHandleIds = new ArrayList<>(); + final List<String> rejectedCmHandleIds = new ArrayList<>(); + + dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> { + final CmSubscriptionStatus cmSubscriptionStatus = + dmiSubscriptionDetails.getCmSubscriptionStatus(); + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates = + dmiSubscriptionDetails.getDmiCmSubscriptionPredicates(); + + switch (cmSubscriptionStatus) { + case ACCEPTED -> acceptedCmHandleIds.addAll( + extractCmHandleIds(dmiCmSubscriptionPredicates)); + case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates)); + default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates)); + } + }); + + cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds); + cmSubscriptionData.setPendingTargets(pendingCmHandleIds); + cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds); + + } + + private List<String> extractCmHandleIds( + final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) { + final List<String> cmHandleIds = new ArrayList<>(); + dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll( + dmiSubscriptionPredicate.getTargetCmHandleIds())); + + return cmHandleIds; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java new file mode 100644 index 0000000000..92800f4af1 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java @@ -0,0 +1,135 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.EventsPublisher; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.onap.cps.utils.JsonObjectMapper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) +public class NcmpOutEventProducer { + + @Value("${app.ncmp.avc.cm-subscription-ncmp-out}") + private String ncmpOutEventTopic; + + @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}") + private Integer dmiOutEventTimeoutInMs; + + private final EventsPublisher<CloudEvent> eventsPublisher; + private final JsonObjectMapper jsonObjectMapper; + private final MappersFacade mappersFacade; + private final DmiCacheHandler dmiCacheHandler; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>(); + + /** + * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud + * Event compliant. + * + * @param subscriptionId Cm Subscription Id + * @param eventType Type of event + * @param ncmpOutEvent Cm Notification Subscription Event for the + * client + * @param isScheduledEvent Determines if the event is to be scheduled + * or published now + */ + public void publishNcmpOutEvent(final String subscriptionId, final String eventType, + final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) { + + if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) { + final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType); + scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture); + log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId); + } else { + cancelScheduledTaskForSubscriptionId(subscriptionId); + publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent); + log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId); + } + } + + private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) { + final NcmpOutEventPublishingTask ncmpOutEventPublishingTask = + new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher, + jsonObjectMapper, mappersFacade, dmiCacheHandler); + return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs, + TimeUnit.MILLISECONDS); + } + + private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) { + + final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId); + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledTasksPerSubscriptionId.remove(subscriptionId); + } + + } + + + private void publishNcmpOutEventNow(final String subscriptionId, final String eventType, + final NcmpOutEvent ncmpOutEvent) { + final CloudEvent ncmpOutEventAsCloudEvent = + buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, + ncmpOutEvent); + eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, + ncmpOutEventAsCloudEvent); + dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); + } + + /** + * Get an NCMP out event as cloud event. + * + * @param jsonObjectMapper JSON object mapper + * @param subscriptionId subscription id + * @param eventType event type + * @param ncmpOutEvent cm notification subscription NCMP out event + * @return cm notification subscription NCMP out event as cloud event + */ + public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(final JsonObjectMapper jsonObjectMapper, + final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) { + + return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType) + .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0")) + .withExtension("correlationid", subscriptionId) + .withData(jsonObjectMapper.asJsonBytes(ncmpOutEvent)).build(); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java new file mode 100644 index 0000000000..5636237566 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp; + +import static org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.EventsPublisher; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler; +import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails; +import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent; +import org.onap.cps.utils.JsonObjectMapper; + +@Slf4j +@RequiredArgsConstructor +public class NcmpOutEventPublishingTask implements Runnable { + + private final String topicName; + private final String subscriptionId; + private final String eventType; + private final EventsPublisher<CloudEvent> eventsPublisher; + private final JsonObjectMapper jsonObjectMapper; + private final MappersFacade mappersFacade; + private final DmiCacheHandler dmiCacheHandler; + + /** + * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will + * be called after a specified delay. + */ + @Override + public void run() { + final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = + dmiCacheHandler.get(subscriptionId); + final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId, + dmiSubscriptionsPerDmi); + eventsPublisher.publishCloudEvent(topicName, subscriptionId, + buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType, + ncmpOutEvent)); + dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java index e2480c5e56..c24507a1a7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java @@ -1,7 +1,6 @@ /* * ============LICENSE_START======================================================= * Copyright (C) 2024 Nordix Foundation - * Modifications Copyright (C) 2024 TechMahindra Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.cmsubscription.service; +package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils; import static org.onap.cps.spi.FetchDescendantsOption.DIRECT_CHILDREN_ONLY; import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS; @@ -43,7 +42,10 @@ import org.springframework.stereotype.Service; @Slf4j @Service @RequiredArgsConstructor -public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotificationSubscriptionPersistenceService { +public class CmSubscriptionPersistenceService { + + private static final String NCMP_DATASPACE_NAME = "NCMP-Admin"; + private static final String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions"; private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions"; private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE = """ @@ -64,22 +66,40 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif private final CpsQueryService cpsQueryService; private final CpsDataService cpsDataService; - @Override - public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath) { - return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty(); + /** + * Check if we have an ongoing cm subscription based on the parameters. + * + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @return true for ongoing cmsubscription , otherwise false + */ + public boolean isOngoingCmSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath) { + return !getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty(); } - @Override + /** + * Check if the subscription ID is unique against ongoing subscriptions. + * + * @param subscriptionId subscription ID + * @return true if subscriptionId is not used in active subscriptions, otherwise false + */ public boolean isUniqueSubscriptionId(final String subscriptionId) { return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, - CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), - OMIT_DESCENDANTS).isEmpty(); + CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), OMIT_DESCENDANTS).isEmpty(); } - @Override - public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType, - final String cmHandleId, final String xpath) { + /** + * Get all ongoing cm notification subscription based on the parameters. + * + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @return collection of subscription ids of ongoing cm notification subscription + */ + public Collection<String> getOngoingCmSubscriptionIds(final DatastoreType datastoreType, + final String cmHandleId, final String xpath) { final String isOngoingCmSubscriptionCpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted( @@ -93,24 +113,38 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds"); } - @Override - public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, final String subscriptionId) { - final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, - cmHandleId, xpath); + /** + * Add cm notification subscription. + * + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @param newSubscriptionId subscription id to be added + */ + public void addCmSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String newSubscriptionId) { + final Collection<String> subscriptionIds = + getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath); if (subscriptionIds.isEmpty()) { - addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, subscriptionId); - } else if (!subscriptionIds.contains(subscriptionId)) { - subscriptionIds.add(subscriptionId); + addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, newSubscriptionId); + } else if (!subscriptionIds.contains(newSubscriptionId)) { + subscriptionIds.add(newSubscriptionId); saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds); } } - @Override - public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, final String subscriptionId) { - final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, - cmHandleId, xpath); + /** + * Remove cm notification Subscription. + * + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @param subscriptionId subscription id to remove + */ + public void removeCmSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String subscriptionId) { + final Collection<String> subscriptionIds = + getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath); if (subscriptionIds.remove(subscriptionId)) { saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds); log.info("There are subscribers left for the following cps path {} :", @@ -126,16 +160,17 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif } private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId, - final String xpath) { + final String xpath) { cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted( datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)), OffsetDateTime.now()); final Collection<DataNode> existingFiltersForCmHandle = cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME, - CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted( - datastoreType.getDatastoreName(), cmHandleId), - DIRECT_CHILDREN_ONLY).iterator().next().getChildDataNodes(); + CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted( + datastoreType.getDatastoreName(), cmHandleId), + DIRECT_CHILDREN_ONLY).iterator().next() + .getChildDataNodes(); if (existingFiltersForCmHandle.isEmpty()) { removeCmHandleFromDatastore(datastoreType.getDatastoreName(), cmHandleId); } @@ -143,46 +178,43 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif private void removeCmHandleFromDatastore(final String datastoreName, final String cmHandleId) { cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, - CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted( - datastoreName, cmHandleId), OffsetDateTime.now()); + CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, cmHandleId), + OffsetDateTime.now()); } private boolean isFirstSubscriptionForCmHandle(final DatastoreType datastoreType, final String cmHandleId) { return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME, CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted( - datastoreType.getDatastoreName(), cmHandleId), - OMIT_DESCENDANTS).isEmpty(); + datastoreType.getDatastoreName(), cmHandleId), OMIT_DESCENDANTS).isEmpty(); } private void addFirstSubscriptionForDatastoreCmHandleAndXpath(final DatastoreType datastoreType, - final String cmHandleId, - final String xpath, - final String subscriptionId) { + final String cmHandleId, final String xpath, final String subscriptionId) { final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId); final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, newSubscriptionList); if (isFirstSubscriptionForCmHandle(datastoreType, cmHandleId)) { - final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles" - .formatted(datastoreType.getDatastoreName()); - final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", - cmHandleId, subscriptionDetailsAsJson); + final String parentXpath = + "/datastores/datastore[@name='%s']/cm-handles".formatted(datastoreType.getDatastoreName()); + final String subscriptionAsJson = + String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", cmHandleId, + subscriptionDetailsAsJson); cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson, OffsetDateTime.now(), ContentType.JSON); } else { cpsDataService.saveListElements(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME, CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted( - datastoreType.getDatastoreName(), cmHandleId), - subscriptionDetailsAsJson, OffsetDateTime.now()); + datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, + OffsetDateTime.now()); } } - private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, - final Collection<String> subscriptionIds) { + private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, final String xpath, + final Collection<String> subscriptionIds) { final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, subscriptionIds); cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME, CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted( - datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, - OffsetDateTime.now(), ContentType.JSON); + datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, OffsetDateTime.now(), + ContentType.JSON); } private String getSubscriptionDetailsAsJson(final String xpath, final Collection<String> subscriptionIds) { @@ -194,4 +226,6 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif private static String escapeQuotesByDoublingThem(final String inputXpath) { return inputXpath.replace("'", "''"); } + } + |