aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
authormpriyank <priyank.maheshwari@est.tech>2024-07-01 12:32:27 +0100
committermpriyank <priyank.maheshwari@est.tech>2024-07-04 10:15:47 +0100
commitbac230bdbe224023d424eda2cd2ef7422f5c3ed6 (patch)
tree6690acdffa3fbe62f0057bbb66c3812841ace141 /cps-ncmp-service/src/main
parent620ab09e76ae4c9072f799687b4e4ae41939df1e (diff)
refactor cmsubscription code
- Moved cmsubscription to the impl package instead of api. - Below packages are renamed or newly created for code and tests - moved ..api.cmsubsription to ..impl.cmsubscription - renamed ..api.cmsubscription.mappers to ..impl.cmsubscription.ncmp or dmi as per the functionality - renamed ..api.cmsubscription.model to ..impl.cmsubscription.models - removed ..api.cmsubscription.producer/consumer and classes moved to ..impl.cmsubscription.ncmp/dmi - new package ..impl.cmsubscription.cache to hold cache config and cache operations - new package ..impl.cmsubscription.avc to hold the CmAvcEventConsumer for the notifications emitted as a result of the subscriptions - Removed the prefix CmNotificationSubscription for majority of the classes , not removed for Config and model classes - Renamed the schemas and package structure as per the code refactoring Note: *EventsFacade and *MappersFacade to be removed in the next patchset Issue-ID: CPS-2298 Change-Id: I1d788ab745d65965570e28beaefa511cbe4a8547 Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java82
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java70
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java78
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java63
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java123
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java114
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java146
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java134
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java84
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java65
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java)8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java)120
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java)16
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java)47
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java)32
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java118
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java)6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java)8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java)4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java83
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java)6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java121
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java)31
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java112
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java135
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java63
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java)130
28 files changed, 991 insertions, 1085 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java
deleted file mode 100644
index ff322ee3cc..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionDelta {
-
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
-
- /**
- * Get the delta for a given predicates list.
- *
- * @param dmiCmNotificationSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates
- * @return delta list of DmiCmNotificationSubscriptionPredicates
- */
- public List<DmiCmNotificationSubscriptionPredicate> getDelta(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final List<DmiCmNotificationSubscriptionPredicate> delta = new ArrayList<>();
-
- for (final DmiCmNotificationSubscriptionPredicate cmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicates) {
-
- final Set<String> targetCmHandleIds = new HashSet<>();
- final Set<String> xpaths = new HashSet<>();
- final DatastoreType datastoreType = cmNotificationSubscriptionPredicate.getDatastoreType();
-
- for (final String cmHandleId : cmNotificationSubscriptionPredicate.getTargetCmHandleIds()) {
- for (final String xpath : cmNotificationSubscriptionPredicate.getXpaths()) {
- if (!cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType,
- cmHandleId, xpath)) {
- xpaths.add(xpath);
- targetCmHandleIds.add(cmHandleId);
-
- }
- }
- }
-
- populateValidDmiCmNotificationSubscriptionPredicateDelta(targetCmHandleIds, xpaths, datastoreType, delta);
- }
- return delta;
- }
-
- private void populateValidDmiCmNotificationSubscriptionPredicateDelta(final Set<String> targetCmHandleIds,
- final Set<String> xpaths, final DatastoreType datastoreType,
- final List<DmiCmNotificationSubscriptionPredicate> delta) {
- if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) {
- final DmiCmNotificationSubscriptionPredicate predicateDelta =
- new DmiCmNotificationSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
- delta.add(predicateDelta);
- }
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java
deleted file mode 100644
index 50a5df537d..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionDmiInEventProducer;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionEventsHandler {
- private final CmNotificationSubscriptionNcmpOutEventProducer cmNotificationSubscriptionNcmpOutEventProducer;
- private final CmNotificationSubscriptionDmiInEventProducer cmNotificationSubscriptionDmiInEventProducer;
-
- /**
- * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
- * Event compliant.
- *
- * @param subscriptionId Cm Subscription id
- * @param eventType Type of event
- * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the
- * client
- * @param isScheduledEvent Determines if the event is to be scheduled
- * or published now
- */
- public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent,
- final boolean isScheduledEvent) {
- cmNotificationSubscriptionNcmpOutEventProducer.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, cmNotificationSubscriptionNcmpOutEvent, isScheduledEvent);
- }
-
- /**
- * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
- *
- * @param subscriptionId Cm Subscription id
- * @param dmiPluginName Dmi Plugin Name
- * @param eventType Type of event
- * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi
- */
- public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType,
- final CmNotificationSubscriptionDmiInEvent
- cmNotificationSubscriptionDmiInEvent) {
- cmNotificationSubscriptionDmiInEventProducer.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, eventType, cmNotificationSubscriptionDmiInEvent);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java
deleted file mode 100644
index 73f9563ecf..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionDmiInEventMapper;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionMappersHandler {
-
- private final CmNotificationSubscriptionDmiInEventMapper cmNotificationSubscriptionDmiInEventMapper;
- private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
-
- /**
- * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
- *
- * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates
- * @return cm notification subscription dmi in event
- */
- public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- return cmNotificationSubscriptionDmiInEventMapper.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionPredicates);
- }
-
- /**
- * Mapper to form a response for the client for the Cm Notification Subscription.
- *
- * @param subscriptionId Cm Notification Subscription id
- * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin
- * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) {
- return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsMap);
- }
-
- /**
- * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
- *
- * @param subscriptionId subscription id
- * @param rejectedTargetFilters list of rejected target filters for the subscription request
- * @return to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- final String subscriptionId, final List<String> rejectedTargetFilters) {
- return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- subscriptionId, rejectedTargetFilters);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
deleted file mode 100644
index f7dd51e637..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer.buildAndGetCmNotificationNcmpOutEventAsCloudEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.utils.JsonObjectMapper;
-
-@Slf4j
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Runnable {
-
- private final String topicName;
- private final String subscriptionId;
- private final String eventType;
- private final EventsPublisher<CloudEvent> eventsPublisher;
- private final JsonObjectMapper jsonObjectMapper;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
-
- /**
- * Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will
- * be called after a specified delay.
- */
- @Override
- public void run() {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsMap);
- eventsPublisher.publishCloudEvent(topicName, subscriptionId,
- buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent));
- dmiCmNotificationSubscriptionCacheHandler
- .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java
deleted file mode 100644
index 978a4cdfe2..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
-
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionDmiOutEventConsumer {
-
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
- private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
-
- private static final String CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
-
- /**
- * Consume the Cm Notification Subscription event from the dmi-plugin.
- *
- * @param cmNotificationSubscriptionDmiOutEventConsumerRecord the event to be consumed
- */
- @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeCmNotificationSubscriptionDmiOutEvent(
- final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiOutEventConsumerRecord) {
- final CloudEvent cloudEvent = cmNotificationSubscriptionDmiOutEventConsumerRecord.value();
- final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
- toTargetEvent(cloudEvent, CmNotificationSubscriptionDmiOutEvent.class);
- final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
- if (cmNotificationSubscriptionDmiOutEvent != null && correlationId != null) {
- final String eventType = cloudEvent.getType();
- handleCmSubscriptionDmiOutEvent(correlationId, eventType, cmNotificationSubscriptionDmiOutEvent);
- }
- }
-
- private void handleCmSubscriptionDmiOutEvent(final String correlationId,
- final String eventType,
- final CmNotificationSubscriptionDmiOutEvent
- cmNotificationSubscriptionDmiOutEvent) {
- final String subscriptionId = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
- final String dmiPluginName = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
- if (eventType.equals("subscriptionCreateResponse")) {
- dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
- if (eventType.equals("subscriptionDeleteResponse")) {
- dmiCmNotificationSubscriptionCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
- }
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
- dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
- }
-
- private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
- final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
- dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
- dmiPluginName, cmNotificationSubscriptionStatus);
- }
-
- private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsPerDmi);
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, cmNotificationSubscriptionNcmpOutEvent, false);
- }
-
- private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
- final Data cmNotificationSubscriptionDmiOutData) {
- return ncmpResponseStatus.getCode().equals(cmNotificationSubscriptionDmiOutData.getStatusCode())
- && ncmpResponseStatus.getMessage()
- .equals(cmNotificationSubscriptionDmiOutData.getStatusMessage());
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java
deleted file mode 100644
index ea21751691..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpOutEventMapper {
-
- /**
- * Mapper to form a response for the client for the Cm Notification Subscription.
- *
- * @param subscriptionId Cm Notification Subscription Id
- * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin
- * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) {
-
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- new CmNotificationSubscriptionNcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds(dmiCmNotificationSubscriptionDetailsMap,
- cmSubscriptionData);
- cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData);
-
- return cmNotificationSubscriptionNcmpOutEvent;
- }
-
- /**
- * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
- *
- * @param subscriptionId subscription id
- * @param rejectedTargetFilters list of rejected target filters for the subscription request
- * @return to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- final String subscriptionId, final List<String> rejectedTargetFilters) {
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- new CmNotificationSubscriptionNcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- cmSubscriptionData.setRejectedTargets(rejectedTargetFilters);
- cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData);
- return cmNotificationSubscriptionNcmpOutEvent;
- }
-
- private void populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds(
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap,
- final Data cmSubscriptionData) {
-
- final List<String> acceptedCmHandleIds = new ArrayList<>();
- final List<String> pendingCmHandleIds = new ArrayList<>();
- final List<String> rejectedCmHandleIds = new ArrayList<>();
-
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus =
- dmiCmNotificationSubscriptionDetails.getCmNotificationSubscriptionStatus();
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates =
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates();
-
- switch (cmNotificationSubscriptionStatus) {
- case ACCEPTED -> acceptedCmHandleIds.addAll(
- extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- }
- });
-
- cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds);
- cmSubscriptionData.setPendingTargets(pendingCmHandleIds);
- cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds);
-
- }
-
- private List<String> extractCmHandleIds(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final List<String> cmHandleIds = new ArrayList<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
- dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
-
- return cmHandleIds;
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java
deleted file mode 100644
index ed7ed2a0ba..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer;
-
-import io.cloudevents.CloudEvent;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import java.net.URI;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionNcmpOutEventPublishingTask;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class CmNotificationSubscriptionNcmpOutEventProducer {
-
- @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
- private String cmNotificationSubscriptionNcmpOutEventTopic;
-
- @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
- private Integer cmNotificationSubscriptionDmiOutEventTimeoutInMs;
-
- private final EventsPublisher<CloudEvent> eventsPublisher;
- private final JsonObjectMapper jsonObjectMapper;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
-
- /**
- * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
- * Event compliant.
- *
- * @param subscriptionId Cm Subscription Id
- * @param eventType Type of event
- * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the
- * client
- * @param isScheduledEvent Determines if the event is to be scheduled
- * or published now
- */
- public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent,
- final boolean isScheduledEvent) {
-
- if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
- final ScheduledFuture<?> scheduledFuture =
- scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType);
- scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
- log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
- } else {
- cancelScheduledTaskForSubscriptionId(subscriptionId);
- publishCmNotificationSubscriptionNcmpOutEventNow(subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent);
- log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
- }
- }
-
- private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final String eventType) {
- final CmNotificationSubscriptionNcmpOutEventPublishingTask
- cmNotificationSubscriptionNcmpOutEventPublishingTask =
- new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic,
- subscriptionId, eventType, eventsPublisher, jsonObjectMapper,
- cmNotificationSubscriptionMappersHandler, dmiCmNotificationSubscriptionCacheHandler);
- return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask,
- cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS);
- }
-
- private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
-
- final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- scheduledTasksPerSubscriptionId.remove(subscriptionId);
- }
-
- }
-
-
- private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent) {
- final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent =
- buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent);
- eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId,
- cmNotificationSubscriptionNcmpOutEventAsCloudEvent);
- dmiCmNotificationSubscriptionCacheHandler
- .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
- }
-
- /**
- * Get an NCMP out event as cloud event.
- *
- * @param jsonObjectMapper JSON object mapper
- * @param subscriptionId subscription id
- * @param eventType event type
- * @param cmNotificationSubscriptionNcmpOutEvent cm notification subscription NCMP out event
- * @return cm notification subscription NCMP out event as cloud event
- */
- public static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent(
- final JsonObjectMapper jsonObjectMapper, final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
-
- return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
- .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
- .withExtension("correlationid", subscriptionId)
- .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java
deleted file mode 100644
index 08e3c95529..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionDelta;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Service;
-
-@Service
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificationSubscriptionHandlerService {
-
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
- private final CmNotificationSubscriptionDelta cmNotificationSubscriptionDelta;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
-
- @Override
- public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
- if (cmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
- dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates);
- handleCmNotificationSubscriptionDelta(subscriptionId);
- scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId,
- "subscriptionCreateResponse");
- } else {
- rejectAndPublishCmNotificationSubscriptionCreateRequest(subscriptionId, predicates);
- }
- }
-
- @Override
- public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
- dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates);
- sendSubscriptionDeleteRequestToDmi(subscriptionId);
- scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
- }
-
- private void scheduleCmNotificationSubscriptionNcmpOutEventResponse(final String subscriptionId,
- final String eventType) {
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, null, true);
- }
-
- private void rejectAndPublishCmNotificationSubscriptionCreateRequest(final String subscriptionId,
- final List<Predicate> predicates) {
- final Set<String> subscriptionTargetFilters =
- predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
- .collect(Collectors.toSet());
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- subscriptionId, new ArrayList<>(subscriptionTargetFilters));
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- "subscriptionCreateResponse", cmNotificationSubscriptionNcmpOutEvent, false);
- }
-
- private void handleCmNotificationSubscriptionDelta(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates =
- cmNotificationSubscriptionDelta.getDelta(
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates());
-
- if (dmiCmNotificationSubscriptionPredicates.isEmpty()) {
- acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(subscriptionId, dmiPluginName);
- } else {
- publishCmNotificationSubscriptionDmiInEventPerDmi(subscriptionId, dmiPluginName,
- dmiCmNotificationSubscriptionPredicates);
- }
- });
- }
-
- private void publishCmNotificationSubscriptionDmiInEventPerDmi(final String subscriptionId,
- final String dmiPluginName,
- final List<DmiCmNotificationSubscriptionPredicate>
- dmiCmNotificationSubscriptionPredicates) {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionPredicates);
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionCreateRequest", cmNotificationSubscriptionDmiInEvent);
- }
-
- private void acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(final String subscriptionId,
- final String dmiPluginName) {
- dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
- dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
- dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
-
- private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates());
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionDeleteRequest", cmNotificationSubscriptionDmiInEvent);
- });
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
deleted file mode 100644
index d87624c23c..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
-
-import java.util.Collection;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-
-public interface CmNotificationSubscriptionPersistenceService {
-
- String NCMP_DATASPACE_NAME = "NCMP-Admin";
- String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
-
- /**
- * Check if we have an ongoing cm subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return true for ongoing cmsubscription , otherwise false
- */
- boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath);
-
- /**
- * Check if the subscription ID is unique against ongoing subscriptions.
- *
- * @param subscriptionId subscription ID
- * @return true if subscriptionId is not used in active subscriptions, otherwise false
- */
- boolean isUniqueSubscriptionId(final String subscriptionId);
-
- /**
- * Get all ongoing cm notification subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return collection of subscription ids of ongoing cm notification subscription
- */
- Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath);
-
- /**
- * Add cm notification subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param newSubscriptionId subscription id to be added
- */
- void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String newSubscriptionId);
-
- /**
- * Remove cm notification Subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param subscriptionId subscription id to remove
- */
- void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId);
-
-}
-
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java
new file mode 100644
index 0000000000..fbe21267d9
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription;
+
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class EventsFacade {
+ private final NcmpOutEventProducer ncmpOutEventProducer;
+ private final DmiInEventProducer dmiInEventProducer;
+
+ /**
+ * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+ * Event compliant.
+ *
+ * @param subscriptionId Cm Subscription id
+ * @param eventType Type of event
+ * @param ncmpOutEvent Cm Notification Subscription Event for the
+ * client
+ * @param isScheduledEvent Determines if the event is to be scheduled
+ * or published now
+ */
+ public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+ ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, isScheduledEvent);
+ }
+
+ /**
+ * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
+ *
+ * @param subscriptionId Cm Subscription id
+ * @param dmiPluginName Dmi Plugin Name
+ * @param eventType Type of event
+ * @param dmiInEvent Cm Notification Subscription event for Dmi
+ */
+ public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DmiInEvent dmiInEvent) {
+ dmiInEventProducer.publishDmiInEvent(subscriptionId,
+ dmiPluginName, eventType, dmiInEvent);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java
new file mode 100644
index 0000000000..e79b4e6441
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription;
+
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class MappersFacade {
+
+ private final DmiInEventMapper dmiInEventMapper;
+ private final NcmpOutEventMapper ncmpOutEventMapper;
+
+ /**
+ * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
+ *
+ * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates
+ * @return cm notification subscription dmi in event
+ */
+ public DmiInEvent toDmiInEvent(
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ return dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
+ }
+
+ /**
+ * Mapper to form a response for the client for the Cm Notification Subscription.
+ *
+ * @param subscriptionId Cm Notification Subscription id
+ * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin
+ * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ return ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ }
+
+ /**
+ * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
+ *
+ * @param subscriptionId subscription id
+ * @param rejectedTargetFilters list of rejected target filters for the subscription request
+ * @return to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEventForRejectedRequest(
+ final String subscriptionId, final List<String> rejectedTargetFilters) {
+ return ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(
+ subscriptionId, rejectedTargetFilters);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java
index 1d6da90a9a..a4f9be357f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java
@@ -18,18 +18,18 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
import java.util.Map;
import org.onap.cps.cache.HazelcastCacheConfig;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
-public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig {
+public class CmSubscriptionConfig extends HazelcastCacheConfig {
private static final MapConfig cmNotificationSubscriptionCacheMapConfig =
createMapConfig("cmNotificationSubscriptionCacheMapConfig");
@@ -42,7 +42,7 @@ public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig
* @return configured map of subscription events.
*/
@Bean
- public IMap<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache() {
+ public IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache() {
return createHazelcastInstance("hazelCastInstanceCmNotificationSubscription",
cmNotificationSubscriptionCacheMapConfig).getMap("cmNotificationSubscriptionCache");
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
index 840ab0fb92..c5052f1405 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
@@ -18,9 +18,9 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus.PENDING;
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,21 +32,21 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
-public class DmiCmNotificationSubscriptionCacheHandler {
+public class DmiCacheHandler {
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
- private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
/**
@@ -56,7 +56,7 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param predicates subscription request predicates
*/
public void add(final String subscriptionId, final List<Predicate> predicates) {
- cmNotificationSubscriptionCache.put(subscriptionId, createDmiCmNotificationSubscriptionsPerDmi(predicates));
+ cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates));
}
/**
@@ -65,7 +65,7 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param subscriptionId subscription id
* @return map of dmi cm notification subscriptions per dmi
*/
- public Map<String, DmiCmNotificationSubscriptionDetails> get(final String subscriptionId) {
+ public Map<String, DmiCmSubscriptionDetails> get(final String subscriptionId) {
return cmNotificationSubscriptionCache.get(subscriptionId);
}
@@ -75,15 +75,15 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
* @param subscriptionId subscription id as key in CM notification Subscription cache.
*/
- public void removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionsPerDmi =
+ public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
- final Map<String, DmiCmNotificationSubscriptionDetails> updatedDmiCmNotificationSubscriptionsPerDmi =
- dmiCmNotificationSubscriptionsPerDmi.entrySet().stream().filter(
- dmiCmNotificationSubscription ->
- !isAcceptedOrRejected(dmiCmNotificationSubscription.getValue()))
+ final Map<String, DmiCmSubscriptionDetails> updatedDmiSubscriptionsPerDmi =
+ dmiSubscriptionsPerDmi.entrySet().stream()
+ .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected(
+ dmiCmNotificationSubscription.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiCmNotificationSubscriptionsPerDmi);
+ cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi);
}
/**
@@ -92,9 +92,9 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param predicates CM Subscription Create Request Predicates
* @return Map of DmiCmNotificationSubscription per DMI plugin
*/
- public Map<String, DmiCmNotificationSubscriptionDetails> createDmiCmNotificationSubscriptionsPerDmi(
+ public Map<String, DmiCmSubscriptionDetails> createDmiSubscriptionsPerDmi(
final List<Predicate> predicates) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
new HashMap<>();
for (final Predicate requestPredicate : predicates) {
final List<String> targetFilter = requestPredicate.getTargetFilter();
@@ -103,15 +103,15 @@ public class DmiCmNotificationSubscriptionCacheHandler {
final Set<String> xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter());
final Map<String, Set<String>> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter);
for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi: targetCmHandlesByDmiMap.entrySet()) {
- final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate =
- new DmiCmNotificationSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
+ new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
datastoreType, xpaths);
- updateDmiCmNotificationSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
- dmiCmNotificationSubscriptionPredicate,
- dmiCmNotificationSubscriptionDetailsPerDmi);
+ updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
+ dmiCmSubscriptionPredicate,
+ dmiSubscriptionsPerDmi);
}
}
- return dmiCmNotificationSubscriptionDetailsPerDmi;
+ return dmiSubscriptionsPerDmi;
}
/**
@@ -122,13 +122,12 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param status String of status
*
*/
- public void updateDmiCmNotificationSubscriptionStatusPerDmi(final String subscriptionId,
- final String dmiServiceName,
- final CmNotificationSubscriptionStatus status) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
+ public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName,
+ final CmSubscriptionStatus status) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName).setCmNotificationSubscriptionStatus(status);
- cmNotificationSubscriptionCache.put(subscriptionId, dmiCmNotificationSubscriptionDetailsPerDmi);
+ dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
+ cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi);
}
/**
@@ -139,18 +138,17 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
*/
public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates();
- for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicateList) {
- final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+ .getDmiCmSubscriptionPredicates();
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
for (final String cmHandle: cmHandles) {
for (final String xpath: xpaths) {
- cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle,
+ cmSubscriptionPersistenceService.addCmSubscription(datastoreType, cmHandle,
xpath, subscriptionId);
}
}
@@ -165,35 +163,34 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
*/
public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates();
- for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicateList) {
- final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+ .getDmiCmSubscriptionPredicates();
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
for (final String cmHandle: cmHandles) {
for (final String xpath: xpaths) {
- cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,
+ cmSubscriptionPersistenceService.removeCmSubscription(datastoreType,
cmHandle, xpath, subscriptionId);
}
}
}
}
- private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
+ private void updateDmiSubscriptionDetailsPerDmi(
final String dmiServiceName,
- final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi) {
- if (dmiCmNotificationSubscriptionDetailsPerDmi.containsKey(dmiServiceName)) {
- dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates().add(dmiCmNotificationSubscriptionPredicate);
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) {
+ dmiSubscriptionsPerDmi.get(dmiServiceName)
+ .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate);
} else {
- dmiCmNotificationSubscriptionDetailsPerDmi.put(dmiServiceName,
- new DmiCmNotificationSubscriptionDetails(
- new ArrayList<>(List.of(dmiCmNotificationSubscriptionPredicate)),
+ dmiSubscriptionsPerDmi.put(dmiServiceName,
+ new DmiCmSubscriptionDetails(
+ new ArrayList<>(List.of(dmiCmSubscriptionPredicate)),
PENDING));
}
}
@@ -211,9 +208,8 @@ public class DmiCmNotificationSubscriptionCacheHandler {
return targetCmHandlesByDmiServiceNames;
}
- private boolean isAcceptedOrRejected(
- final DmiCmNotificationSubscriptionDetails dmiCmNotificationSubscription) {
- return dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("REJECTED");
+ private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
+ return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
+ || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
index f635f1a80b..0207fb90e3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avc;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
@@ -33,13 +33,13 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
- * Listener for AVC events.
+ * Listener for AVC events based on Cm Subscriptions.
*/
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class AvcEventConsumer {
+public class CmAvcEventConsumer {
@Value("${app.ncmp.avc.cm-events-topic}")
@@ -50,15 +50,17 @@ public class AvcEventConsumer {
/**
* Incoming AvcEvent in the form of Consumer Record.
*
- * @param avcEventConsumerRecord Incoming raw consumer record
+ * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
- log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+ public void consumeAndForward(
+ final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+ log.debug("Consuming AVC event {} ...", cmAvcEventAsConsumerRecord.value());
final String newEventId = UUID.randomUUID().toString();
final CloudEvent outgoingAvcEvent =
- CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ CloudEventBuilder.from(cmAvcEventAsConsumerRecord.value()).withId(newEventId)
+ .build();
eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java
index 7263891a21..4ce4ef36cf 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
import java.util.ArrayList;
import java.util.HashSet;
@@ -27,46 +27,44 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmHandle;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Data;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Predicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.ScopeFilter;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.ScopeFilter;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
-public class CmNotificationSubscriptionDmiInEventMapper {
+public class DmiInEventMapper {
private final InventoryPersistence inventoryPersistence;
/**
* Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
*
- * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates
- * @return CmNotificationSubscriptionDmiInEvent to be sent to DMI Plugin
+ * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates
+ * @return DmiInEvent to be sent to DMI Plugin
*/
- public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- new CmNotificationSubscriptionDmiInEvent();
+ public DmiInEvent toDmiInEvent(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final DmiInEvent dmiInEvent = new DmiInEvent();
final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmNotificationSubscriptionPredicates));
- cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmhandleWithPrivateProperties(
- extractUniqueCmHandleIds(dmiCmNotificationSubscriptionPredicates)));
- cmNotificationSubscriptionDmiInEvent.setData(cmSubscriptionData);
- return cmNotificationSubscriptionDmiInEvent;
+ cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates));
+ cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithPrivateProperties(
+ extractUniqueCmHandleIds(dmiCmSubscriptionPredicates)));
+ dmiInEvent.setData(cmSubscriptionData);
+ return dmiInEvent;
}
private List<Predicate> mapToDmiInEventPredicates(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final List<Predicate> predicates = new ArrayList<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> {
+ dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> {
final Predicate predicate = new Predicate();
final ScopeFilter scopeFilter = new ScopeFilter();
scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue(
@@ -81,7 +79,7 @@ public class CmNotificationSubscriptionDmiInEventMapper {
}
- private List<CmHandle> mapToCmSubscriptionCmhandleWithPrivateProperties(final Set<String> cmHandleIds) {
+ private List<CmHandle> mapToCmSubscriptionCmHandleWithPrivateProperties(final Set<String> cmHandleIds) {
final List<CmHandle> cmSubscriptionCmHandles = new ArrayList<>();
@@ -99,11 +97,10 @@ public class CmNotificationSubscriptionDmiInEventMapper {
}
- private Set<String> extractUniqueCmHandleIds(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
+ private Set<String> extractUniqueCmHandleIds(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final Set<String> cmHandleIds = new HashSet<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
+ dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
return cmHandleIds;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java
index 3273c556c6..c62916f05c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
@@ -26,7 +26,7 @@ import java.net.URI;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -35,38 +35,36 @@ import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class CmNotificationSubscriptionDmiInEventProducer {
+public class DmiInEventProducer {
private final EventsPublisher<CloudEvent> eventsPublisher;
private final JsonObjectMapper jsonObjectMapper;
@Value("${app.ncmp.avc.cm-subscription-dmi-in}")
- private String cmNotificationSubscriptionDmiInEventTopic;
+ private String dmiInEventTopic;
/**
* Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
*
- * @param subscriptionId Cm Subscription Id
- * @param dmiPluginName Dmi Plugin Name
- * @param eventType Type of event
- * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi
+ * @param subscriptionId Cm Subscription Id
+ * @param dmiPluginName Dmi Plugin Name
+ * @param eventType Type of event
+ * @param dmiInEvent Cm Notification Subscription event for Dmi
*/
- public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) {
- eventsPublisher.publishCloudEvent(cmNotificationSubscriptionDmiInEventTopic, subscriptionId,
- buildAndGetCmNotificationDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType,
- cmNotificationSubscriptionDmiInEvent));
+ public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DmiInEvent dmiInEvent) {
+ eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId,
+ buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
}
- private CloudEvent buildAndGetCmNotificationDmiInEventAsCloudEvent(final String subscriptionId,
- final String dmiPluginName, final String eventType,
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) {
+ private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId,
+ final String dmiPluginName, final String eventType, final DmiInEvent dmiInEvent) {
return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
.withSource(URI.create("NCMP"))
.withDataSchema(URI.create("org.onap.ncmp.dmi.cm.subscription:1.0.0"))
.withExtension("correlationid", subscriptionId.concat("#").concat(dmiPluginName))
- .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionDmiInEvent)).build();
+ .withData(jsonObjectMapper.asJsonBytes(dmiInEvent)).build();
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
new file mode 100644
index 0000000000..2a45818624
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.NcmpResponseStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class DmiOutEventConsumer {
+
+ private final DmiCacheHandler dmiCacheHandler;
+ private final EventsFacade eventsFacade;
+ private final MappersFacade mappersFacade;
+
+ private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
+
+ /**
+ * Consume the Cm Notification Subscription event from the dmi-plugin.
+ *
+ * @param dmiOutEventAsConsumerRecord the event to be consumed
+ */
+ @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
+ final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
+ final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
+ final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
+ if (dmiOutEvent != null && correlationId != null) {
+ final String eventType = cloudEvent.getType();
+ handleDmiOutEvent(correlationId, eventType, dmiOutEvent);
+ }
+ }
+
+ private void handleDmiOutEvent(final String correlationId, final String eventType,
+ final DmiOutEvent dmiOutEvent) {
+ final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
+ final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
+
+ if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED);
+ if (eventType.equals("subscriptionCreateResponse")) {
+ dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+ if (eventType.equals("subscriptionDeleteResponse")) {
+ dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+ handleEventsStatusPerDmi(subscriptionId, eventType);
+ }
+
+ if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED);
+ handleEventsStatusPerDmi(subscriptionId, eventType);
+ }
+
+ log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
+ dmiPluginName, dmiOutEvent.getData().getStatusMessage());
+ }
+
+ private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+ final CmSubscriptionStatus cmSubscriptionStatus) {
+ dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ cmSubscriptionStatus);
+ }
+
+ private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ eventsFacade.publishNcmpOutEvent(subscriptionId, eventType,
+ ncmpOutEvent, false);
+ }
+
+ private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
+ final Data dmiOutData) {
+ return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
+ && ncmpResponseStatus.getMessage()
+ .equals(dmiOutData.getStatusMessage());
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java
index 68d54fac95..5b7c46ed00 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java
@@ -18,15 +18,15 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
-public enum CmNotificationSubscriptionStatus {
+public enum CmSubscriptionStatus {
ACCEPTED("ACCEPTED"), REJECTED("REJECTED"), PENDING("PENDING");
private final String cmNotificationSubscriptionStatusValue;
- CmNotificationSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) {
+ CmSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) {
this.cmNotificationSubscriptionStatusValue = cmNotificationSubscriptionStatusValue;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java
index 95757e7240..dbc607ad27 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
import java.util.List;
import lombok.AllArgsConstructor;
@@ -28,8 +28,8 @@ import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
-public class DmiCmNotificationSubscriptionDetails {
+public class DmiCmSubscriptionDetails {
- private List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates;
- private CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus;
+ private List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates;
+ private CmSubscriptionStatus cmSubscriptionStatus;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java
index 40c0188fa0..84d3aead8c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
import java.util.Set;
import lombok.AllArgsConstructor;
@@ -29,7 +29,7 @@ import org.onap.cps.ncmp.api.data.models.DatastoreType;
@Getter
@Setter
@AllArgsConstructor
-public class DmiCmNotificationSubscriptionPredicate {
+public class DmiCmSubscriptionPredicate {
private Set<String> targetCmHandleIds;
private DatastoreType datastoreType;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java
new file mode 100644
index 0000000000..d7f15a2c72
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.data.models.DatastoreType;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class CmSubscriptionComparator {
+
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+
+ /**
+ * Get the new Dmi Predicates for a given predicates list.
+ *
+ * @param existingDmiCmSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates
+ * @return new list of DmiCmNotificationSubscriptionPredicates
+ */
+ public List<DmiCmSubscriptionPredicate> getNewDmiSubscriptionPredicates(
+ final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> newDmiCmSubscriptionPredicates =
+ new ArrayList<>();
+
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) {
+
+ final Set<String> targetCmHandleIds = new HashSet<>();
+ final Set<String> xpaths = new HashSet<>();
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+
+ for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
+ for (final String xpath : dmiCmSubscriptionPredicate.getXpaths()) {
+ if (!cmSubscriptionPersistenceService.isOngoingCmSubscription(datastoreType,
+ cmHandleId, xpath)) {
+ xpaths.add(xpath);
+ targetCmHandleIds.add(cmHandleId);
+
+ }
+ }
+ }
+
+ populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
+ newDmiCmSubscriptionPredicates);
+ }
+ return newDmiCmSubscriptionPredicates;
+ }
+
+ private void populateValidDmiSubscriptionPredicates(final Set<String> targetCmHandleIds,
+ final Set<String> xpaths, final DatastoreType datastoreType,
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) {
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
+ new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
+ dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate);
+ }
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
index 1c52ffa798..3a9b2066b2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
@@ -18,12 +18,12 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import java.util.List;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
-public interface CmNotificationSubscriptionHandlerService {
+public interface CmSubscriptionHandler {
/**
* Process cm notification subscription create request.
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
new file mode 100644
index 0000000000..e225b705d7
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
+
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final CmSubscriptionComparator cmSubscriptionComparator;
+ private final MappersFacade mappersFacade;
+ private final EventsFacade eventsFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+
+ @Override
+ public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+ if (cmSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
+ dmiCacheHandler.add(subscriptionId, predicates);
+ handleNewCmSubscription(subscriptionId);
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
+ } else {
+ rejectAndPublishCreateRequest(subscriptionId, predicates);
+ }
+ }
+
+ @Override
+ public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
+ dmiCacheHandler.add(subscriptionId, predicates);
+ sendSubscriptionDeleteRequestToDmi(subscriptionId);
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ }
+
+ private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
+ eventsFacade.publishNcmpOutEvent(subscriptionId, eventType, null, true);
+ }
+
+ private void rejectAndPublishCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+ final Set<String> subscriptionTargetFilters =
+ predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
+ .collect(Collectors.toSet());
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEventForRejectedRequest(subscriptionId,
+ new ArrayList<>(subscriptionTargetFilters));
+ eventsFacade.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
+ }
+
+ private void handleNewCmSubscription(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
+ cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
+
+ if (dmiCmSubscriptionPredicates.isEmpty()) {
+ acceptAndPublishNcmpOutEventPerDmi(subscriptionId, dmiPluginName);
+ } else {
+ publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
+ }
+ });
+ }
+
+ private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent(dmiCmSubscriptionPredicates);
+ eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName,
+ "subscriptionCreateRequest", dmiInEvent);
+ }
+
+ private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) {
+ dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ CmSubscriptionStatus.ACCEPTED);
+ dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+
+ private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent(
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName,
+ "subscriptionDeleteRequest", dmiInEvent);
+ });
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
index 65f4ee8c89..1e1359dd0d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
@@ -27,44 +27,43 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionHandlerService;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpInEventConsumer {
+public class NcmpInEventConsumer {
- private final CmNotificationSubscriptionHandlerService cmNotificationSubscriptionHandlerService;
+ private final CmSubscriptionHandler cmSubscriptionHandler;
/**
* Consume the specified event.
*
- * @param subscriptionEventConsumerRecord the event to be consumed
+ * @param ncmpInEventAsConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
- final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
- final CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent =
- toTargetEvent(cloudEvent, CmNotificationSubscriptionNcmpInEvent.class);
+ public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> ncmpInEventAsConsumerRecord) {
+ final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value();
+ final NcmpInEvent ncmpInEvent =
+ toTargetEvent(cloudEvent, NcmpInEvent.class);
log.info("Subscription with name {} to be mapped to hazelcast object...",
- cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId());
+ ncmpInEvent.getData().getSubscriptionId());
- final String subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
- final List<Predicate> predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+ final String subscriptionId = ncmpInEvent.getData().getSubscriptionId();
+ final List<Predicate> predicates = ncmpInEvent.getData().getPredicates();
if ("subscriptionCreateRequest".equals(cloudEvent.getType())) {
log.info("Subscription create request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates);
}
if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
log.info("Subscription delete request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates);
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java
new file mode 100644
index 0000000000..ffd4b014fb
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class NcmpOutEventMapper {
+
+ /**
+ * Mapper to form a response for the client for the Cm Notification Subscription.
+ *
+ * @param subscriptionId Cm Notification Subscription Id
+ * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin
+ * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+
+ final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
+ final Data cmSubscriptionData = new Data();
+ cmSubscriptionData.setSubscriptionId(subscriptionId);
+ populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi,
+ cmSubscriptionData);
+ ncmpOutEvent.setData(cmSubscriptionData);
+
+ return ncmpOutEvent;
+ }
+
+ /**
+ * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
+ *
+ * @param subscriptionId subscription id
+ * @param rejectedTargetFilters list of rejected target filters for the subscription request
+ * @return to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId,
+ final List<String> rejectedTargetFilters) {
+ final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
+ final Data cmSubscriptionData = new Data();
+ cmSubscriptionData.setSubscriptionId(subscriptionId);
+ cmSubscriptionData.setRejectedTargets(rejectedTargetFilters);
+ ncmpOutEvent.setData(cmSubscriptionData);
+ return ncmpOutEvent;
+ }
+
+ private void populateNcmpOutEventWithCmHandleIds(
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi,
+ final Data cmSubscriptionData) {
+
+ final List<String> acceptedCmHandleIds = new ArrayList<>();
+ final List<String> pendingCmHandleIds = new ArrayList<>();
+ final List<String> rejectedCmHandleIds = new ArrayList<>();
+
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final CmSubscriptionStatus cmSubscriptionStatus =
+ dmiSubscriptionDetails.getCmSubscriptionStatus();
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates();
+
+ switch (cmSubscriptionStatus) {
+ case ACCEPTED -> acceptedCmHandleIds.addAll(
+ extractCmHandleIds(dmiCmSubscriptionPredicates));
+ case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
+ default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
+ }
+ });
+
+ cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds);
+ cmSubscriptionData.setPendingTargets(pendingCmHandleIds);
+ cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds);
+
+ }
+
+ private List<String> extractCmHandleIds(
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<String> cmHandleIds = new ArrayList<>();
+ dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll(
+ dmiSubscriptionPredicate.getTargetCmHandleIds()));
+
+ return cmHandleIds;
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java
new file mode 100644
index 0000000000..92800f4af1
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
+public class NcmpOutEventProducer {
+
+ @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
+ private String ncmpOutEventTopic;
+
+ @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
+ private Integer dmiOutEventTimeoutInMs;
+
+ private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
+ private final MappersFacade mappersFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
+
+ /**
+ * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+ * Event compliant.
+ *
+ * @param subscriptionId Cm Subscription Id
+ * @param eventType Type of event
+ * @param ncmpOutEvent Cm Notification Subscription Event for the
+ * client
+ * @param isScheduledEvent Determines if the event is to be scheduled
+ * or published now
+ */
+ public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+
+ if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
+ final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
+ scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
+ log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
+ } else {
+ cancelScheduledTaskForSubscriptionId(subscriptionId);
+ publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
+ log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
+ }
+ }
+
+ private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) {
+ final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
+ new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher,
+ jsonObjectMapper, mappersFacade, dmiCacheHandler);
+ return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
+
+ final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ scheduledTasksPerSubscriptionId.remove(subscriptionId);
+ }
+
+ }
+
+
+ private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent) {
+ final CloudEvent ncmpOutEventAsCloudEvent =
+ buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+ ncmpOutEvent);
+ eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId,
+ ncmpOutEventAsCloudEvent);
+ dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
+ }
+
+ /**
+ * Get an NCMP out event as cloud event.
+ *
+ * @param jsonObjectMapper JSON object mapper
+ * @param subscriptionId subscription id
+ * @param eventType event type
+ * @param ncmpOutEvent cm notification subscription NCMP out event
+ * @return cm notification subscription NCMP out event as cloud event
+ */
+ public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(final JsonObjectMapper jsonObjectMapper,
+ final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
+
+ return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
+ .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
+ .withExtension("correlationid", subscriptionId)
+ .withData(jsonObjectMapper.asJsonBytes(ncmpOutEvent)).build();
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java
new file mode 100644
index 0000000000..5636237566
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Slf4j
+@RequiredArgsConstructor
+public class NcmpOutEventPublishingTask implements Runnable {
+
+ private final String topicName;
+ private final String subscriptionId;
+ private final String eventType;
+ private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
+ private final MappersFacade mappersFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+
+ /**
+ * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will
+ * be called after a specified delay.
+ */
+ @Override
+ public void run() {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ eventsPublisher.publishCloudEvent(topicName, subscriptionId,
+ buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+ ncmpOutEvent));
+ dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
index e2480c5e56..c24507a1a7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
@@ -1,7 +1,6 @@
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2024 Nordix Foundation
- * Modifications Copyright (C) 2024 TechMahindra Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
import static org.onap.cps.spi.FetchDescendantsOption.DIRECT_CHILDREN_ONLY;
import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
@@ -43,7 +42,10 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
-public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotificationSubscriptionPersistenceService {
+public class CmSubscriptionPersistenceService {
+
+ private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
+ private static final String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions";
private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE = """
@@ -64,22 +66,40 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private final CpsQueryService cpsQueryService;
private final CpsDataService cpsDataService;
- @Override
- public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
- return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
+ /**
+ * Check if we have an ongoing cm subscription based on the parameters.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @return true for ongoing cmsubscription , otherwise false
+ */
+ public boolean isOngoingCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath) {
+ return !getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
}
- @Override
+ /**
+ * Check if the subscription ID is unique against ongoing subscriptions.
+ *
+ * @param subscriptionId subscription ID
+ * @return true if subscriptionId is not used in active subscriptions, otherwise false
+ */
public boolean isUniqueSubscriptionId(final String subscriptionId) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
- OMIT_DESCENDANTS).isEmpty();
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), OMIT_DESCENDANTS).isEmpty();
}
- @Override
- public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath) {
+ /**
+ * Get all ongoing cm notification subscription based on the parameters.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @return collection of subscription ids of ongoing cm notification subscription
+ */
+ public Collection<String> getOngoingCmSubscriptionIds(final DatastoreType datastoreType,
+ final String cmHandleId, final String xpath) {
final String isOngoingCmSubscriptionCpsPathQuery =
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
@@ -93,24 +113,38 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds");
}
- @Override
- public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId) {
- final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
- cmHandleId, xpath);
+ /**
+ * Add cm notification subscription.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param newSubscriptionId subscription id to be added
+ */
+ public void addCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String newSubscriptionId) {
+ final Collection<String> subscriptionIds =
+ getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
if (subscriptionIds.isEmpty()) {
- addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, subscriptionId);
- } else if (!subscriptionIds.contains(subscriptionId)) {
- subscriptionIds.add(subscriptionId);
+ addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, newSubscriptionId);
+ } else if (!subscriptionIds.contains(newSubscriptionId)) {
+ subscriptionIds.add(newSubscriptionId);
saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
}
}
- @Override
- public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId) {
- final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
- cmHandleId, xpath);
+ /**
+ * Remove cm notification Subscription.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param subscriptionId subscription id to remove
+ */
+ public void removeCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String subscriptionId) {
+ final Collection<String> subscriptionIds =
+ getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
if (subscriptionIds.remove(subscriptionId)) {
saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
log.info("There are subscribers left for the following cps path {} :",
@@ -126,16 +160,17 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
}
private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
+ final String xpath) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)),
OffsetDateTime.now());
final Collection<DataNode> existingFiltersForCmHandle =
cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- DIRECT_CHILDREN_ONLY).iterator().next().getChildDataNodes();
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
+ datastoreType.getDatastoreName(), cmHandleId),
+ DIRECT_CHILDREN_ONLY).iterator().next()
+ .getChildDataNodes();
if (existingFiltersForCmHandle.isEmpty()) {
removeCmHandleFromDatastore(datastoreType.getDatastoreName(), cmHandleId);
}
@@ -143,46 +178,43 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private void removeCmHandleFromDatastore(final String datastoreName, final String cmHandleId) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreName, cmHandleId), OffsetDateTime.now());
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, cmHandleId),
+ OffsetDateTime.now());
}
private boolean isFirstSubscriptionForCmHandle(final DatastoreType datastoreType, final String cmHandleId) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- OMIT_DESCENDANTS).isEmpty();
+ datastoreType.getDatastoreName(), cmHandleId), OMIT_DESCENDANTS).isEmpty();
}
private void addFirstSubscriptionForDatastoreCmHandleAndXpath(final DatastoreType datastoreType,
- final String cmHandleId,
- final String xpath,
- final String subscriptionId) {
+ final String cmHandleId, final String xpath, final String subscriptionId) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, newSubscriptionList);
if (isFirstSubscriptionForCmHandle(datastoreType, cmHandleId)) {
- final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles"
- .formatted(datastoreType.getDatastoreName());
- final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}",
- cmHandleId, subscriptionDetailsAsJson);
+ final String parentXpath =
+ "/datastores/datastore[@name='%s']/cm-handles".formatted(datastoreType.getDatastoreName());
+ final String subscriptionAsJson =
+ String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", cmHandleId,
+ subscriptionDetailsAsJson);
cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
OffsetDateTime.now(), ContentType.JSON);
} else {
cpsDataService.saveListElements(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- subscriptionDetailsAsJson, OffsetDateTime.now());
+ datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
+ OffsetDateTime.now());
}
}
- private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath,
- final Collection<String> subscriptionIds) {
+ private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, final String xpath,
+ final Collection<String> subscriptionIds) {
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, subscriptionIds);
cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
- OffsetDateTime.now(), ContentType.JSON);
+ datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, OffsetDateTime.now(),
+ ContentType.JSON);
}
private String getSubscriptionDetailsAsJson(final String xpath, final Collection<String> subscriptionIds) {
@@ -194,4 +226,6 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private static String escapeQuotesByDoublingThem(final String inputXpath) {
return inputXpath.replace("'", "''");
}
+
}
+