summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
authorDaniel Hanrahan <daniel.hanrahan@est.tech>2024-07-05 10:23:48 +0000
committerGerrit Code Review <gerrit@onap.org>2024-07-05 10:23:48 +0000
commitc1c26ec8a97ce7de7d976665ae0c147fbf9ad80d (patch)
treee1273a27cd58f3e7ce617c48bf4e5418edb74cc7 /cps-ncmp-service/src
parent82053f446aa1eb35e2a05e2557431497b15b031b (diff)
parentbac230bdbe224023d424eda2cd2ef7422f5c3ed6 (diff)
Merge "refactor cmsubscription code"
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java82
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java70
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java78
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java63
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java123
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java114
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java146
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java134
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java84
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java65
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java)8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java)120
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java)16
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java)47
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java)32
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java118
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java)6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java)8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java)4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java83
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java)6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java121
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java)31
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java112
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java135
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java63
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java)130
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryConfigSpec.groovy)0
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDeltaSpec.groovy60
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy88
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapperSpec.groovy64
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy147
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacadeSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandlerSpec.groovy)38
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacadeSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandlerSpec.groovy)27
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfigSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfigSpec.groovy)22
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy)88
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy)8
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapperSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapperSpec.groovy)14
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiInEventProducerSpec.groovy)29
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy)52
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparatorSpec.groovy62
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy145
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy)24
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapperSpec.groovy69
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy89
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy)87
47 files changed, 1551 insertions, 1638 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java
deleted file mode 100644
index ff322ee3cc..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDelta.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionDelta {
-
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
-
- /**
- * Get the delta for a given predicates list.
- *
- * @param dmiCmNotificationSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates
- * @return delta list of DmiCmNotificationSubscriptionPredicates
- */
- public List<DmiCmNotificationSubscriptionPredicate> getDelta(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final List<DmiCmNotificationSubscriptionPredicate> delta = new ArrayList<>();
-
- for (final DmiCmNotificationSubscriptionPredicate cmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicates) {
-
- final Set<String> targetCmHandleIds = new HashSet<>();
- final Set<String> xpaths = new HashSet<>();
- final DatastoreType datastoreType = cmNotificationSubscriptionPredicate.getDatastoreType();
-
- for (final String cmHandleId : cmNotificationSubscriptionPredicate.getTargetCmHandleIds()) {
- for (final String xpath : cmNotificationSubscriptionPredicate.getXpaths()) {
- if (!cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType,
- cmHandleId, xpath)) {
- xpaths.add(xpath);
- targetCmHandleIds.add(cmHandleId);
-
- }
- }
- }
-
- populateValidDmiCmNotificationSubscriptionPredicateDelta(targetCmHandleIds, xpaths, datastoreType, delta);
- }
- return delta;
- }
-
- private void populateValidDmiCmNotificationSubscriptionPredicateDelta(final Set<String> targetCmHandleIds,
- final Set<String> xpaths, final DatastoreType datastoreType,
- final List<DmiCmNotificationSubscriptionPredicate> delta) {
- if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) {
- final DmiCmNotificationSubscriptionPredicate predicateDelta =
- new DmiCmNotificationSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
- delta.add(predicateDelta);
- }
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java
deleted file mode 100644
index 50a5df537d..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionDmiInEventProducer;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionEventsHandler {
- private final CmNotificationSubscriptionNcmpOutEventProducer cmNotificationSubscriptionNcmpOutEventProducer;
- private final CmNotificationSubscriptionDmiInEventProducer cmNotificationSubscriptionDmiInEventProducer;
-
- /**
- * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
- * Event compliant.
- *
- * @param subscriptionId Cm Subscription id
- * @param eventType Type of event
- * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the
- * client
- * @param isScheduledEvent Determines if the event is to be scheduled
- * or published now
- */
- public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent,
- final boolean isScheduledEvent) {
- cmNotificationSubscriptionNcmpOutEventProducer.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, cmNotificationSubscriptionNcmpOutEvent, isScheduledEvent);
- }
-
- /**
- * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
- *
- * @param subscriptionId Cm Subscription id
- * @param dmiPluginName Dmi Plugin Name
- * @param eventType Type of event
- * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi
- */
- public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType,
- final CmNotificationSubscriptionDmiInEvent
- cmNotificationSubscriptionDmiInEvent) {
- cmNotificationSubscriptionDmiInEventProducer.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, eventType, cmNotificationSubscriptionDmiInEvent);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java
deleted file mode 100644
index 73f9563ecf..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionDmiInEventMapper;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionMappersHandler {
-
- private final CmNotificationSubscriptionDmiInEventMapper cmNotificationSubscriptionDmiInEventMapper;
- private final CmNotificationSubscriptionNcmpOutEventMapper cmNotificationSubscriptionNcmpOutEventMapper;
-
- /**
- * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
- *
- * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates
- * @return cm notification subscription dmi in event
- */
- public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- return cmNotificationSubscriptionDmiInEventMapper.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionPredicates);
- }
-
- /**
- * Mapper to form a response for the client for the Cm Notification Subscription.
- *
- * @param subscriptionId Cm Notification Subscription id
- * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin
- * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) {
- return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsMap);
- }
-
- /**
- * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
- *
- * @param subscriptionId subscription id
- * @param rejectedTargetFilters list of rejected target filters for the subscription request
- * @return to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- final String subscriptionId, final List<String> rejectedTargetFilters) {
- return cmNotificationSubscriptionNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- subscriptionId, rejectedTargetFilters);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
deleted file mode 100644
index f7dd51e637..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventPublishingTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
-
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer.buildAndGetCmNotificationNcmpOutEventAsCloudEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.utils.JsonObjectMapper;
-
-@Slf4j
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpOutEventPublishingTask implements Runnable {
-
- private final String topicName;
- private final String subscriptionId;
- private final String eventType;
- private final EventsPublisher<CloudEvent> eventsPublisher;
- private final JsonObjectMapper jsonObjectMapper;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
-
- /**
- * Delegating the responsibility of publishing CmNotificationSubscriptionNcmpOutEvent as a separate task which will
- * be called after a specified delay.
- */
- @Override
- public void run() {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsMap);
- eventsPublisher.publishCloudEvent(topicName, subscriptionId,
- buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent));
- dmiCmNotificationSubscriptionCacheHandler
- .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java
deleted file mode 100644
index 978a4cdfe2..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
-
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
-import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
-
-import io.cloudevents.CloudEvent;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionDmiOutEventConsumer {
-
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
- private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
-
- private static final String CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
-
- /**
- * Consume the Cm Notification Subscription event from the dmi-plugin.
- *
- * @param cmNotificationSubscriptionDmiOutEventConsumerRecord the event to be consumed
- */
- @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeCmNotificationSubscriptionDmiOutEvent(
- final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiOutEventConsumerRecord) {
- final CloudEvent cloudEvent = cmNotificationSubscriptionDmiOutEventConsumerRecord.value();
- final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
- toTargetEvent(cloudEvent, CmNotificationSubscriptionDmiOutEvent.class);
- final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
- if (cmNotificationSubscriptionDmiOutEvent != null && correlationId != null) {
- final String eventType = cloudEvent.getType();
- handleCmSubscriptionDmiOutEvent(correlationId, eventType, cmNotificationSubscriptionDmiOutEvent);
- }
- }
-
- private void handleCmSubscriptionDmiOutEvent(final String correlationId,
- final String eventType,
- final CmNotificationSubscriptionDmiOutEvent
- cmNotificationSubscriptionDmiOutEvent) {
- final String subscriptionId = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
- final String dmiPluginName = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
- if (eventType.equals("subscriptionCreateResponse")) {
- dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
- if (eventType.equals("subscriptionDeleteResponse")) {
- dmiCmNotificationSubscriptionCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
- }
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
- handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
- handleEventsStatusPerDmi(subscriptionId, eventType);
- }
-
- log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
- dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
- }
-
- private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
- final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
- dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
- dmiPluginName, cmNotificationSubscriptionStatus);
- }
-
- private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- dmiCmNotificationSubscriptionDetailsPerDmi);
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, cmNotificationSubscriptionNcmpOutEvent, false);
- }
-
- private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
- final Data cmNotificationSubscriptionDmiOutData) {
- return ncmpResponseStatus.getCode().equals(cmNotificationSubscriptionDmiOutData.getStatusCode())
- && ncmpResponseStatus.getMessage()
- .equals(cmNotificationSubscriptionDmiOutData.getStatusMessage());
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java
deleted file mode 100644
index ea21751691..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data;
-import org.springframework.stereotype.Component;
-
-@Component
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpOutEventMapper {
-
- /**
- * Mapper to form a response for the client for the Cm Notification Subscription.
- *
- * @param subscriptionId Cm Notification Subscription Id
- * @param dmiCmNotificationSubscriptionDetailsMap contains CmNotificationSubscriptionDetails per dmi plugin
- * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap) {
-
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- new CmNotificationSubscriptionNcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds(dmiCmNotificationSubscriptionDetailsMap,
- cmSubscriptionData);
- cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData);
-
- return cmNotificationSubscriptionNcmpOutEvent;
- }
-
- /**
- * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
- *
- * @param subscriptionId subscription id
- * @param rejectedTargetFilters list of rejected target filters for the subscription request
- * @return to sent back to the client
- */
- public CmNotificationSubscriptionNcmpOutEvent toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- final String subscriptionId, final List<String> rejectedTargetFilters) {
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- new CmNotificationSubscriptionNcmpOutEvent();
- final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setSubscriptionId(subscriptionId);
- cmSubscriptionData.setRejectedTargets(rejectedTargetFilters);
- cmNotificationSubscriptionNcmpOutEvent.setData(cmSubscriptionData);
- return cmNotificationSubscriptionNcmpOutEvent;
- }
-
- private void populateCmNotificationSubscriptionNcmpOutEventWithCmHandleIds(
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap,
- final Data cmSubscriptionData) {
-
- final List<String> acceptedCmHandleIds = new ArrayList<>();
- final List<String> pendingCmHandleIds = new ArrayList<>();
- final List<String> rejectedCmHandleIds = new ArrayList<>();
-
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus =
- dmiCmNotificationSubscriptionDetails.getCmNotificationSubscriptionStatus();
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates =
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates();
-
- switch (cmNotificationSubscriptionStatus) {
- case ACCEPTED -> acceptedCmHandleIds.addAll(
- extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmNotificationSubscriptionPredicates));
- }
- });
-
- cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds);
- cmSubscriptionData.setPendingTargets(pendingCmHandleIds);
- cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds);
-
- }
-
- private List<String> extractCmHandleIds(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final List<String> cmHandleIds = new ArrayList<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
- dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
-
- return cmHandleIds;
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java
deleted file mode 100644
index ed7ed2a0ba..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionNcmpOutEventProducer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer;
-
-import io.cloudevents.CloudEvent;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import java.net.URI;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionNcmpOutEventPublishingTask;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class CmNotificationSubscriptionNcmpOutEventProducer {
-
- @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
- private String cmNotificationSubscriptionNcmpOutEventTopic;
-
- @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
- private Integer cmNotificationSubscriptionDmiOutEventTimeoutInMs;
-
- private final EventsPublisher<CloudEvent> eventsPublisher;
- private final JsonObjectMapper jsonObjectMapper;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
-
- /**
- * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
- * Event compliant.
- *
- * @param subscriptionId Cm Subscription Id
- * @param eventType Type of event
- * @param cmNotificationSubscriptionNcmpOutEvent Cm Notification Subscription Event for the
- * client
- * @param isScheduledEvent Determines if the event is to be scheduled
- * or published now
- */
- public void publishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent,
- final boolean isScheduledEvent) {
-
- if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
- final ScheduledFuture<?> scheduledFuture =
- scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType);
- scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
- log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
- } else {
- cancelScheduledTaskForSubscriptionId(subscriptionId);
- publishCmNotificationSubscriptionNcmpOutEventNow(subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent);
- log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
- }
- }
-
- private ScheduledFuture<?> scheduleAndPublishCmNotificationSubscriptionNcmpOutEvent(final String subscriptionId,
- final String eventType) {
- final CmNotificationSubscriptionNcmpOutEventPublishingTask
- cmNotificationSubscriptionNcmpOutEventPublishingTask =
- new CmNotificationSubscriptionNcmpOutEventPublishingTask(cmNotificationSubscriptionNcmpOutEventTopic,
- subscriptionId, eventType, eventsPublisher, jsonObjectMapper,
- cmNotificationSubscriptionMappersHandler, dmiCmNotificationSubscriptionCacheHandler);
- return scheduledExecutorService.schedule(cmNotificationSubscriptionNcmpOutEventPublishingTask,
- cmNotificationSubscriptionDmiOutEventTimeoutInMs, TimeUnit.MILLISECONDS);
- }
-
- private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
-
- final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- scheduledTasksPerSubscriptionId.remove(subscriptionId);
- }
-
- }
-
-
- private void publishCmNotificationSubscriptionNcmpOutEventNow(final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent
- cmNotificationSubscriptionNcmpOutEvent) {
- final CloudEvent cmNotificationSubscriptionNcmpOutEventAsCloudEvent =
- buildAndGetCmNotificationNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
- cmNotificationSubscriptionNcmpOutEvent);
- eventsPublisher.publishCloudEvent(cmNotificationSubscriptionNcmpOutEventTopic, subscriptionId,
- cmNotificationSubscriptionNcmpOutEventAsCloudEvent);
- dmiCmNotificationSubscriptionCacheHandler
- .removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId);
- }
-
- /**
- * Get an NCMP out event as cloud event.
- *
- * @param jsonObjectMapper JSON object mapper
- * @param subscriptionId subscription id
- * @param eventType event type
- * @param cmNotificationSubscriptionNcmpOutEvent cm notification subscription NCMP out event
- * @return cm notification subscription NCMP out event as cloud event
- */
- public static CloudEvent buildAndGetCmNotificationNcmpOutEventAsCloudEvent(
- final JsonObjectMapper jsonObjectMapper, final String subscriptionId, final String eventType,
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent) {
-
- return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
- .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
- .withExtension("correlationid", subscriptionId)
- .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionNcmpOutEvent)).build();
- }
-
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java
deleted file mode 100644
index 08e3c95529..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionDelta;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
-import org.springframework.stereotype.Service;
-
-@Service
-@RequiredArgsConstructor
-public class CmNotificationSubscriptionHandlerServiceImpl implements CmNotificationSubscriptionHandlerService {
-
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
- private final CmNotificationSubscriptionDelta cmNotificationSubscriptionDelta;
- private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
- private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler;
- private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
-
- @Override
- public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
- if (cmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
- dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates);
- handleCmNotificationSubscriptionDelta(subscriptionId);
- scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId,
- "subscriptionCreateResponse");
- } else {
- rejectAndPublishCmNotificationSubscriptionCreateRequest(subscriptionId, predicates);
- }
- }
-
- @Override
- public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
- dmiCmNotificationSubscriptionCacheHandler.add(subscriptionId, predicates);
- sendSubscriptionDeleteRequestToDmi(subscriptionId);
- scheduleCmNotificationSubscriptionNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
- }
-
- private void scheduleCmNotificationSubscriptionNcmpOutEventResponse(final String subscriptionId,
- final String eventType) {
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- eventType, null, true);
- }
-
- private void rejectAndPublishCmNotificationSubscriptionCreateRequest(final String subscriptionId,
- final List<Predicate> predicates) {
- final Set<String> subscriptionTargetFilters =
- predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
- .collect(Collectors.toSet());
- final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- subscriptionId, new ArrayList<>(subscriptionTargetFilters));
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
- "subscriptionCreateResponse", cmNotificationSubscriptionNcmpOutEvent, false);
- }
-
- private void handleCmNotificationSubscriptionDelta(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates =
- cmNotificationSubscriptionDelta.getDelta(
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates());
-
- if (dmiCmNotificationSubscriptionPredicates.isEmpty()) {
- acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(subscriptionId, dmiPluginName);
- } else {
- publishCmNotificationSubscriptionDmiInEventPerDmi(subscriptionId, dmiPluginName,
- dmiCmNotificationSubscriptionPredicates);
- }
- });
- }
-
- private void publishCmNotificationSubscriptionDmiInEventPerDmi(final String subscriptionId,
- final String dmiPluginName,
- final List<DmiCmNotificationSubscriptionPredicate>
- dmiCmNotificationSubscriptionPredicates) {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionPredicates);
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionCreateRequest", cmNotificationSubscriptionDmiInEvent);
- }
-
- private void acceptAndPublishCmNotificationSubscriptionNcmpOutEventPerDmi(final String subscriptionId,
- final String dmiPluginName) {
- dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
- dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
- dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
- }
-
- private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap =
- dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsMap.forEach((dmiPluginName, dmiCmNotificationSubscriptionDetails) -> {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(
- dmiCmNotificationSubscriptionDetails.getDmiCmNotificationSubscriptionPredicates());
- cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(subscriptionId,
- dmiPluginName, "subscriptionDeleteRequest", cmNotificationSubscriptionDmiInEvent);
- });
- }
-} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
deleted file mode 100644
index d87624c23c..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
-
-import java.util.Collection;
-import org.onap.cps.ncmp.api.data.models.DatastoreType;
-
-public interface CmNotificationSubscriptionPersistenceService {
-
- String NCMP_DATASPACE_NAME = "NCMP-Admin";
- String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
-
- /**
- * Check if we have an ongoing cm subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return true for ongoing cmsubscription , otherwise false
- */
- boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath);
-
- /**
- * Check if the subscription ID is unique against ongoing subscriptions.
- *
- * @param subscriptionId subscription ID
- * @return true if subscriptionId is not used in active subscriptions, otherwise false
- */
- boolean isUniqueSubscriptionId(final String subscriptionId);
-
- /**
- * Get all ongoing cm notification subscription based on the parameters.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @return collection of subscription ids of ongoing cm notification subscription
- */
- Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath);
-
- /**
- * Add cm notification subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param newSubscriptionId subscription id to be added
- */
- void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String newSubscriptionId);
-
- /**
- * Remove cm notification Subscription.
- *
- * @param datastoreType the susbcription target datastore type
- * @param cmHandleId the id of the cm handle for the susbcription
- * @param xpath the target xpath
- * @param subscriptionId subscription id to remove
- */
- void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId);
-
-}
-
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java
new file mode 100644
index 0000000000..fbe21267d9
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacade.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription;
+
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class EventsFacade {
+ private final NcmpOutEventProducer ncmpOutEventProducer;
+ private final DmiInEventProducer dmiInEventProducer;
+
+ /**
+ * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+ * Event compliant.
+ *
+ * @param subscriptionId Cm Subscription id
+ * @param eventType Type of event
+ * @param ncmpOutEvent Cm Notification Subscription Event for the
+ * client
+ * @param isScheduledEvent Determines if the event is to be scheduled
+ * or published now
+ */
+ public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+ ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, isScheduledEvent);
+ }
+
+ /**
+ * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
+ *
+ * @param subscriptionId Cm Subscription id
+ * @param dmiPluginName Dmi Plugin Name
+ * @param eventType Type of event
+ * @param dmiInEvent Cm Notification Subscription event for Dmi
+ */
+ public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DmiInEvent dmiInEvent) {
+ dmiInEventProducer.publishDmiInEvent(subscriptionId,
+ dmiPluginName, eventType, dmiInEvent);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java
new file mode 100644
index 0000000000..e79b4e6441
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacade.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription;
+
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class MappersFacade {
+
+ private final DmiInEventMapper dmiInEventMapper;
+ private final NcmpOutEventMapper ncmpOutEventMapper;
+
+ /**
+ * Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
+ *
+ * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates
+ * @return cm notification subscription dmi in event
+ */
+ public DmiInEvent toDmiInEvent(
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ return dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
+ }
+
+ /**
+ * Mapper to form a response for the client for the Cm Notification Subscription.
+ *
+ * @param subscriptionId Cm Notification Subscription id
+ * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin
+ * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ return ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ }
+
+ /**
+ * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
+ *
+ * @param subscriptionId subscription id
+ * @param rejectedTargetFilters list of rejected target filters for the subscription request
+ * @return to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEventForRejectedRequest(
+ final String subscriptionId, final List<String> rejectedTargetFilters) {
+ return ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(
+ subscriptionId, rejectedTargetFilters);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java
index 1d6da90a9a..a4f9be357f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfig.java
@@ -18,18 +18,18 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
import com.hazelcast.config.MapConfig;
import com.hazelcast.map.IMap;
import java.util.Map;
import org.onap.cps.cache.HazelcastCacheConfig;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
-public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig {
+public class CmSubscriptionConfig extends HazelcastCacheConfig {
private static final MapConfig cmNotificationSubscriptionCacheMapConfig =
createMapConfig("cmNotificationSubscriptionCacheMapConfig");
@@ -42,7 +42,7 @@ public class CmNotificationSubscriptionCacheConfig extends HazelcastCacheConfig
* @return configured map of subscription events.
*/
@Bean
- public IMap<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache() {
+ public IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache() {
return createHazelcastInstance("hazelCastInstanceCmNotificationSubscription",
cmNotificationSubscriptionCacheMapConfig).getMap("cmNotificationSubscriptionCache");
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
index 840ab0fb92..c5052f1405 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandler.java
@@ -18,9 +18,9 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache;
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus.PENDING;
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,21 +32,21 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.onap.cps.ncmp.api.data.models.DatastoreType;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
-public class DmiCmNotificationSubscriptionCacheHandler {
+public class DmiCacheHandler {
- private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
- private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final Map<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
/**
@@ -56,7 +56,7 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param predicates subscription request predicates
*/
public void add(final String subscriptionId, final List<Predicate> predicates) {
- cmNotificationSubscriptionCache.put(subscriptionId, createDmiCmNotificationSubscriptionsPerDmi(predicates));
+ cmNotificationSubscriptionCache.put(subscriptionId, createDmiSubscriptionsPerDmi(predicates));
}
/**
@@ -65,7 +65,7 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param subscriptionId subscription id
* @return map of dmi cm notification subscriptions per dmi
*/
- public Map<String, DmiCmNotificationSubscriptionDetails> get(final String subscriptionId) {
+ public Map<String, DmiCmSubscriptionDetails> get(final String subscriptionId) {
return cmNotificationSubscriptionCache.get(subscriptionId);
}
@@ -75,15 +75,15 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
* @param subscriptionId subscription id as key in CM notification Subscription cache.
*/
- public void removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(final String subscriptionId) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionsPerDmi =
+ public void removeAcceptedAndRejectedDmiSubscriptionEntries(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
- final Map<String, DmiCmNotificationSubscriptionDetails> updatedDmiCmNotificationSubscriptionsPerDmi =
- dmiCmNotificationSubscriptionsPerDmi.entrySet().stream().filter(
- dmiCmNotificationSubscription ->
- !isAcceptedOrRejected(dmiCmNotificationSubscription.getValue()))
+ final Map<String, DmiCmSubscriptionDetails> updatedDmiSubscriptionsPerDmi =
+ dmiSubscriptionsPerDmi.entrySet().stream()
+ .filter(dmiCmNotificationSubscription -> !isAcceptedOrRejected(
+ dmiCmNotificationSubscription.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiCmNotificationSubscriptionsPerDmi);
+ cmNotificationSubscriptionCache.put(subscriptionId, updatedDmiSubscriptionsPerDmi);
}
/**
@@ -92,9 +92,9 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param predicates CM Subscription Create Request Predicates
* @return Map of DmiCmNotificationSubscription per DMI plugin
*/
- public Map<String, DmiCmNotificationSubscriptionDetails> createDmiCmNotificationSubscriptionsPerDmi(
+ public Map<String, DmiCmSubscriptionDetails> createDmiSubscriptionsPerDmi(
final List<Predicate> predicates) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
new HashMap<>();
for (final Predicate requestPredicate : predicates) {
final List<String> targetFilter = requestPredicate.getTargetFilter();
@@ -103,15 +103,15 @@ public class DmiCmNotificationSubscriptionCacheHandler {
final Set<String> xpaths = new HashSet<>(requestPredicate.getScopeFilter().getXpathFilter());
final Map<String, Set<String>> targetCmHandlesByDmiMap = groupTargetCmHandleIdsByDmi(targetFilter);
for (final Map.Entry<String, Set<String>> targetCmHandlesByDmi: targetCmHandlesByDmiMap.entrySet()) {
- final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate =
- new DmiCmNotificationSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
+ new DmiCmSubscriptionPredicate(targetCmHandlesByDmi.getValue(),
datastoreType, xpaths);
- updateDmiCmNotificationSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
- dmiCmNotificationSubscriptionPredicate,
- dmiCmNotificationSubscriptionDetailsPerDmi);
+ updateDmiSubscriptionDetailsPerDmi(targetCmHandlesByDmi.getKey(),
+ dmiCmSubscriptionPredicate,
+ dmiSubscriptionsPerDmi);
}
}
- return dmiCmNotificationSubscriptionDetailsPerDmi;
+ return dmiSubscriptionsPerDmi;
}
/**
@@ -122,13 +122,12 @@ public class DmiCmNotificationSubscriptionCacheHandler {
* @param status String of status
*
*/
- public void updateDmiCmNotificationSubscriptionStatusPerDmi(final String subscriptionId,
- final String dmiServiceName,
- final CmNotificationSubscriptionStatus status) {
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
+ public void updateDmiSubscriptionStatusPerDmi(final String subscriptionId, final String dmiServiceName,
+ final CmSubscriptionStatus status) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
cmNotificationSubscriptionCache.get(subscriptionId);
- dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName).setCmNotificationSubscriptionStatus(status);
- cmNotificationSubscriptionCache.put(subscriptionId, dmiCmNotificationSubscriptionDetailsPerDmi);
+ dmiSubscriptionsPerDmi.get(dmiServiceName).setCmSubscriptionStatus(status);
+ cmNotificationSubscriptionCache.put(subscriptionId, dmiSubscriptionsPerDmi);
}
/**
@@ -139,18 +138,17 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
*/
public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates();
- for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicateList) {
- final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+ .getDmiCmSubscriptionPredicates();
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
for (final String cmHandle: cmHandles) {
for (final String xpath: xpaths) {
- cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle,
+ cmSubscriptionPersistenceService.addCmSubscription(datastoreType, cmHandle,
xpath, subscriptionId);
}
}
@@ -165,35 +163,34 @@ public class DmiCmNotificationSubscriptionCacheHandler {
*
*/
public void removeFromDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates();
- for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
- dmiCmNotificationSubscriptionPredicateList) {
- final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
- final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
- final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+ .getDmiCmSubscriptionPredicates();
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : dmiCmSubscriptionPredicates) {
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmSubscriptionPredicate.getXpaths();
for (final String cmHandle: cmHandles) {
for (final String xpath: xpaths) {
- cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,
+ cmSubscriptionPersistenceService.removeCmSubscription(datastoreType,
cmHandle, xpath, subscriptionId);
}
}
}
}
- private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
+ private void updateDmiSubscriptionDetailsPerDmi(
final String dmiServiceName,
- final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
- final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi) {
- if (dmiCmNotificationSubscriptionDetailsPerDmi.containsKey(dmiServiceName)) {
- dmiCmNotificationSubscriptionDetailsPerDmi.get(dmiServiceName)
- .getDmiCmNotificationSubscriptionPredicates().add(dmiCmNotificationSubscriptionPredicate);
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+ if (dmiSubscriptionsPerDmi.containsKey(dmiServiceName)) {
+ dmiSubscriptionsPerDmi.get(dmiServiceName)
+ .getDmiCmSubscriptionPredicates().add(dmiCmSubscriptionPredicate);
} else {
- dmiCmNotificationSubscriptionDetailsPerDmi.put(dmiServiceName,
- new DmiCmNotificationSubscriptionDetails(
- new ArrayList<>(List.of(dmiCmNotificationSubscriptionPredicate)),
+ dmiSubscriptionsPerDmi.put(dmiServiceName,
+ new DmiCmSubscriptionDetails(
+ new ArrayList<>(List.of(dmiCmSubscriptionPredicate)),
PENDING));
}
}
@@ -211,9 +208,8 @@ public class DmiCmNotificationSubscriptionCacheHandler {
return targetCmHandlesByDmiServiceNames;
}
- private boolean isAcceptedOrRejected(
- final DmiCmNotificationSubscriptionDetails dmiCmNotificationSubscription) {
- return dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("ACCEPTED")
- || dmiCmNotificationSubscription.getCmNotificationSubscriptionStatus().toString().equals("REJECTED");
+ private boolean isAcceptedOrRejected(final DmiCmSubscriptionDetails dmiCmSubscription) {
+ return dmiCmSubscription.getCmSubscriptionStatus().toString().equals("ACCEPTED")
+ || dmiCmSubscription.getCmSubscriptionStatus().toString().equals("REJECTED");
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
index f635f1a80b..0207fb90e3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avc;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
@@ -33,13 +33,13 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
- * Listener for AVC events.
+ * Listener for AVC events based on Cm Subscriptions.
*/
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class AvcEventConsumer {
+public class CmAvcEventConsumer {
@Value("${app.ncmp.avc.cm-events-topic}")
@@ -50,15 +50,17 @@ public class AvcEventConsumer {
/**
* Incoming AvcEvent in the form of Consumer Record.
*
- * @param avcEventConsumerRecord Incoming raw consumer record
+ * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
*/
@KafkaListener(topics = "${app.dmi.cm-events.topic}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
- log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
+ public void consumeAndForward(
+ final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
+ log.debug("Consuming AVC event {} ...", cmAvcEventAsConsumerRecord.value());
final String newEventId = UUID.randomUUID().toString();
final CloudEvent outgoingAvcEvent =
- CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build();
+ CloudEventBuilder.from(cmAvcEventAsConsumerRecord.value()).withId(newEventId)
+ .build();
eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java
index 7263891a21..4ce4ef36cf 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapper.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
import java.util.ArrayList;
import java.util.HashSet;
@@ -27,46 +27,44 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmHandle;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Data;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Predicate;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.ScopeFilter;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.ScopeFilter;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
-public class CmNotificationSubscriptionDmiInEventMapper {
+public class DmiInEventMapper {
private final InventoryPersistence inventoryPersistence;
/**
* Mapper to form a request for the DMI Plugin for the Cm Notification Subscription.
*
- * @param dmiCmNotificationSubscriptionPredicates Collection of Cm Notification Subscription predicates
- * @return CmNotificationSubscriptionDmiInEvent to be sent to DMI Plugin
+ * @param dmiCmSubscriptionPredicates Collection of Cm Notification Subscription predicates
+ * @return DmiInEvent to be sent to DMI Plugin
*/
- public CmNotificationSubscriptionDmiInEvent toCmNotificationSubscriptionDmiInEvent(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
- new CmNotificationSubscriptionDmiInEvent();
+ public DmiInEvent toDmiInEvent(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final DmiInEvent dmiInEvent = new DmiInEvent();
final Data cmSubscriptionData = new Data();
- cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmNotificationSubscriptionPredicates));
- cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmhandleWithPrivateProperties(
- extractUniqueCmHandleIds(dmiCmNotificationSubscriptionPredicates)));
- cmNotificationSubscriptionDmiInEvent.setData(cmSubscriptionData);
- return cmNotificationSubscriptionDmiInEvent;
+ cmSubscriptionData.setPredicates(mapToDmiInEventPredicates(dmiCmSubscriptionPredicates));
+ cmSubscriptionData.setCmHandles(mapToCmSubscriptionCmHandleWithPrivateProperties(
+ extractUniqueCmHandleIds(dmiCmSubscriptionPredicates)));
+ dmiInEvent.setData(cmSubscriptionData);
+ return dmiInEvent;
}
private List<Predicate> mapToDmiInEventPredicates(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final List<Predicate> predicates = new ArrayList<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> {
+ dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> {
final Predicate predicate = new Predicate();
final ScopeFilter scopeFilter = new ScopeFilter();
scopeFilter.setDatastore(ScopeFilter.Datastore.fromValue(
@@ -81,7 +79,7 @@ public class CmNotificationSubscriptionDmiInEventMapper {
}
- private List<CmHandle> mapToCmSubscriptionCmhandleWithPrivateProperties(final Set<String> cmHandleIds) {
+ private List<CmHandle> mapToCmSubscriptionCmHandleWithPrivateProperties(final Set<String> cmHandleIds) {
final List<CmHandle> cmSubscriptionCmHandles = new ArrayList<>();
@@ -99,11 +97,10 @@ public class CmNotificationSubscriptionDmiInEventMapper {
}
- private Set<String> extractUniqueCmHandleIds(
- final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates) {
+ private Set<String> extractUniqueCmHandleIds(final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final Set<String> cmHandleIds = new HashSet<>();
- dmiCmNotificationSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
+ dmiCmSubscriptionPredicates.forEach(dmiCmNotificationSubscriptionPredicate -> cmHandleIds.addAll(
dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds()));
return cmHandleIds;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java
index 3273c556c6..c62916f05c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/producer/CmNotificationSubscriptionDmiInEventProducer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.producer;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
@@ -26,7 +26,7 @@ import java.net.URI;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -35,38 +35,36 @@ import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class CmNotificationSubscriptionDmiInEventProducer {
+public class DmiInEventProducer {
private final EventsPublisher<CloudEvent> eventsPublisher;
private final JsonObjectMapper jsonObjectMapper;
@Value("${app.ncmp.avc.cm-subscription-dmi-in}")
- private String cmNotificationSubscriptionDmiInEventTopic;
+ private String dmiInEventTopic;
/**
* Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
*
- * @param subscriptionId Cm Subscription Id
- * @param dmiPluginName Dmi Plugin Name
- * @param eventType Type of event
- * @param cmNotificationSubscriptionDmiInEvent Cm Notification Subscription event for Dmi
+ * @param subscriptionId Cm Subscription Id
+ * @param dmiPluginName Dmi Plugin Name
+ * @param eventType Type of event
+ * @param dmiInEvent Cm Notification Subscription event for Dmi
*/
- public void publishCmNotificationSubscriptionDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) {
- eventsPublisher.publishCloudEvent(cmNotificationSubscriptionDmiInEventTopic, subscriptionId,
- buildAndGetCmNotificationDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType,
- cmNotificationSubscriptionDmiInEvent));
+ public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DmiInEvent dmiInEvent) {
+ eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId,
+ buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
}
- private CloudEvent buildAndGetCmNotificationDmiInEventAsCloudEvent(final String subscriptionId,
- final String dmiPluginName, final String eventType,
- final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent) {
+ private CloudEvent buildAndGetDmiInEventAsCloudEvent(final String subscriptionId,
+ final String dmiPluginName, final String eventType, final DmiInEvent dmiInEvent) {
return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
.withSource(URI.create("NCMP"))
.withDataSchema(URI.create("org.onap.ncmp.dmi.cm.subscription:1.0.0"))
.withExtension("correlationid", subscriptionId.concat("#").concat(dmiPluginName))
- .withData(jsonObjectMapper.asJsonBytes(cmNotificationSubscriptionDmiInEvent)).build();
+ .withData(jsonObjectMapper.asJsonBytes(dmiInEvent)).build();
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
new file mode 100644
index 0000000000..2a45818624
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumer.java
@@ -0,0 +1,118 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
+
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
+import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
+import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.NcmpResponseStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class DmiOutEventConsumer {
+
+ private final DmiCacheHandler dmiCacheHandler;
+ private final EventsFacade eventsFacade;
+ private final MappersFacade mappersFacade;
+
+ private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
+
+ /**
+ * Consume the Cm Notification Subscription event from the dmi-plugin.
+ *
+ * @param dmiOutEventAsConsumerRecord the event to be consumed
+ */
+ @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
+ final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
+ final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
+ final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
+ if (dmiOutEvent != null && correlationId != null) {
+ final String eventType = cloudEvent.getType();
+ handleDmiOutEvent(correlationId, eventType, dmiOutEvent);
+ }
+ }
+
+ private void handleDmiOutEvent(final String correlationId, final String eventType,
+ final DmiOutEvent dmiOutEvent) {
+ final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
+ final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
+
+ if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED);
+ if (eventType.equals("subscriptionCreateResponse")) {
+ dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+ if (eventType.equals("subscriptionDeleteResponse")) {
+ dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+ handleEventsStatusPerDmi(subscriptionId, eventType);
+ }
+
+ if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED);
+ handleEventsStatusPerDmi(subscriptionId, eventType);
+ }
+
+ log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
+ dmiPluginName, dmiOutEvent.getData().getStatusMessage());
+ }
+
+ private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+ final CmSubscriptionStatus cmSubscriptionStatus) {
+ dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ cmSubscriptionStatus);
+ }
+
+ private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ eventsFacade.publishNcmpOutEvent(subscriptionId, eventType,
+ ncmpOutEvent, false);
+ }
+
+ private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
+ final Data dmiOutData) {
+ return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
+ && ncmpResponseStatus.getMessage()
+ .equals(dmiOutData.getStatusMessage());
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java
index 68d54fac95..5b7c46ed00 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/CmNotificationSubscriptionStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/CmSubscriptionStatus.java
@@ -18,15 +18,15 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
-public enum CmNotificationSubscriptionStatus {
+public enum CmSubscriptionStatus {
ACCEPTED("ACCEPTED"), REJECTED("REJECTED"), PENDING("PENDING");
private final String cmNotificationSubscriptionStatusValue;
- CmNotificationSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) {
+ CmSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) {
this.cmNotificationSubscriptionStatusValue = cmNotificationSubscriptionStatusValue;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java
index 95757e7240..dbc607ad27 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionDetails.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionDetails.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
import java.util.List;
import lombok.AllArgsConstructor;
@@ -28,8 +28,8 @@ import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
-public class DmiCmNotificationSubscriptionDetails {
+public class DmiCmSubscriptionDetails {
- private List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicates;
- private CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus;
+ private List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates;
+ private CmSubscriptionStatus cmSubscriptionStatus;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java
index 40c0188fa0..84d3aead8c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/model/DmiCmNotificationSubscriptionPredicate.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/models/DmiCmSubscriptionPredicate.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.model;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.models;
import java.util.Set;
import lombok.AllArgsConstructor;
@@ -29,7 +29,7 @@ import org.onap.cps.ncmp.api.data.models.DatastoreType;
@Getter
@Setter
@AllArgsConstructor
-public class DmiCmNotificationSubscriptionPredicate {
+public class DmiCmSubscriptionPredicate {
private Set<String> targetCmHandleIds;
private DatastoreType datastoreType;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java
new file mode 100644
index 0000000000..d7f15a2c72
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparator.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.data.models.DatastoreType;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class CmSubscriptionComparator {
+
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+
+ /**
+ * Get the new Dmi Predicates for a given predicates list.
+ *
+ * @param existingDmiCmSubscriptionPredicates list of DmiCmNotificationSubscriptionPredicates
+ * @return new list of DmiCmNotificationSubscriptionPredicates
+ */
+ public List<DmiCmSubscriptionPredicate> getNewDmiSubscriptionPredicates(
+ final List<DmiCmSubscriptionPredicate> existingDmiCmSubscriptionPredicates) {
+ final List<DmiCmSubscriptionPredicate> newDmiCmSubscriptionPredicates =
+ new ArrayList<>();
+
+ for (final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate : existingDmiCmSubscriptionPredicates) {
+
+ final Set<String> targetCmHandleIds = new HashSet<>();
+ final Set<String> xpaths = new HashSet<>();
+ final DatastoreType datastoreType = dmiCmSubscriptionPredicate.getDatastoreType();
+
+ for (final String cmHandleId : dmiCmSubscriptionPredicate.getTargetCmHandleIds()) {
+ for (final String xpath : dmiCmSubscriptionPredicate.getXpaths()) {
+ if (!cmSubscriptionPersistenceService.isOngoingCmSubscription(datastoreType,
+ cmHandleId, xpath)) {
+ xpaths.add(xpath);
+ targetCmHandleIds.add(cmHandleId);
+
+ }
+ }
+ }
+
+ populateValidDmiSubscriptionPredicates(targetCmHandleIds, xpaths, datastoreType,
+ newDmiCmSubscriptionPredicates);
+ }
+ return newDmiCmSubscriptionPredicates;
+ }
+
+ private void populateValidDmiSubscriptionPredicates(final Set<String> targetCmHandleIds,
+ final Set<String> xpaths, final DatastoreType datastoreType,
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ if (!(targetCmHandleIds.isEmpty() || xpaths.isEmpty())) {
+ final DmiCmSubscriptionPredicate dmiCmSubscriptionPredicate =
+ new DmiCmSubscriptionPredicate(targetCmHandleIds, datastoreType, xpaths);
+ dmiCmSubscriptionPredicates.add(dmiCmSubscriptionPredicate);
+ }
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
index 1c52ffa798..3a9b2066b2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandler.java
@@ -18,12 +18,12 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import java.util.List;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
-public interface CmNotificationSubscriptionHandlerService {
+public interface CmSubscriptionHandler {
/**
* Process cm notification subscription create request.
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
new file mode 100644
index 0000000000..e225b705d7
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImpl.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler {
+
+ private final CmSubscriptionPersistenceService cmSubscriptionPersistenceService;
+ private final CmSubscriptionComparator cmSubscriptionComparator;
+ private final MappersFacade mappersFacade;
+ private final EventsFacade eventsFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+
+ @Override
+ public void processSubscriptionCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+ if (cmSubscriptionPersistenceService.isUniqueSubscriptionId(subscriptionId)) {
+ dmiCacheHandler.add(subscriptionId, predicates);
+ handleNewCmSubscription(subscriptionId);
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
+ } else {
+ rejectAndPublishCreateRequest(subscriptionId, predicates);
+ }
+ }
+
+ @Override
+ public void processSubscriptionDeleteRequest(final String subscriptionId, final List<Predicate> predicates) {
+ dmiCacheHandler.add(subscriptionId, predicates);
+ sendSubscriptionDeleteRequestToDmi(subscriptionId);
+ scheduleNcmpOutEventResponse(subscriptionId, "subscriptionDeleteResponse");
+ }
+
+ private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
+ eventsFacade.publishNcmpOutEvent(subscriptionId, eventType, null, true);
+ }
+
+ private void rejectAndPublishCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+ final Set<String> subscriptionTargetFilters =
+ predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
+ .collect(Collectors.toSet());
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEventForRejectedRequest(subscriptionId,
+ new ArrayList<>(subscriptionTargetFilters));
+ eventsFacade.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
+ }
+
+ private void handleNewCmSubscription(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
+ cmSubscriptionComparator.getNewDmiSubscriptionPredicates(
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
+
+ if (dmiCmSubscriptionPredicates.isEmpty()) {
+ acceptAndPublishNcmpOutEventPerDmi(subscriptionId, dmiPluginName);
+ } else {
+ publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
+ }
+ });
+ }
+
+ private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent(dmiCmSubscriptionPredicates);
+ eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName,
+ "subscriptionCreateRequest", dmiInEvent);
+ }
+
+ private void acceptAndPublishNcmpOutEventPerDmi(final String subscriptionId, final String dmiPluginName) {
+ dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
+ CmSubscriptionStatus.ACCEPTED);
+ dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+
+ private void sendSubscriptionDeleteRequestToDmi(final String subscriptionId) {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final DmiInEvent dmiInEvent = mappersFacade.toDmiInEvent(
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates());
+ eventsFacade.publishDmiInEvent(subscriptionId, dmiPluginName,
+ "subscriptionDeleteRequest", dmiInEvent);
+ });
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
index 65f4ee8c89..1e1359dd0d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/consumer/CmNotificationSubscriptionNcmpInEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
@@ -27,44 +27,43 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionHandlerService;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent;
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
-public class CmNotificationSubscriptionNcmpInEventConsumer {
+public class NcmpInEventConsumer {
- private final CmNotificationSubscriptionHandlerService cmNotificationSubscriptionHandlerService;
+ private final CmSubscriptionHandler cmSubscriptionHandler;
/**
* Consume the specified event.
*
- * @param subscriptionEventConsumerRecord the event to be consumed
+ * @param ncmpInEventAsConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
- final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
- final CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent =
- toTargetEvent(cloudEvent, CmNotificationSubscriptionNcmpInEvent.class);
+ public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> ncmpInEventAsConsumerRecord) {
+ final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value();
+ final NcmpInEvent ncmpInEvent =
+ toTargetEvent(cloudEvent, NcmpInEvent.class);
log.info("Subscription with name {} to be mapped to hazelcast object...",
- cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId());
+ ncmpInEvent.getData().getSubscriptionId());
- final String subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
- final List<Predicate> predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+ final String subscriptionId = ncmpInEvent.getData().getSubscriptionId();
+ final List<Predicate> predicates = ncmpInEvent.getData().getPredicates();
if ("subscriptionCreateRequest".equals(cloudEvent.getType())) {
log.info("Subscription create request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates);
}
if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
log.info("Subscription delete request for source {} with subscription id {} ...",
cloudEvent.getSource(), subscriptionId);
- cmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest(subscriptionId, predicates);
+ cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId, predicates);
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java
new file mode 100644
index 0000000000..ffd4b014fb
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapper.java
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class NcmpOutEventMapper {
+
+ /**
+ * Mapper to form a response for the client for the Cm Notification Subscription.
+ *
+ * @param subscriptionId Cm Notification Subscription Id
+ * @param dmiSubscriptionsPerDmi contains CmNotificationSubscriptionDetails per dmi plugin
+ * @return CmNotificationSubscriptionNcmpOutEvent to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEvent(final String subscriptionId,
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi) {
+
+ final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
+ final Data cmSubscriptionData = new Data();
+ cmSubscriptionData.setSubscriptionId(subscriptionId);
+ populateNcmpOutEventWithCmHandleIds(dmiSubscriptionsPerDmi,
+ cmSubscriptionData);
+ ncmpOutEvent.setData(cmSubscriptionData);
+
+ return ncmpOutEvent;
+ }
+
+ /**
+ * Mapper to form a rejected response for the client for the Cm Notification Subscription Request.
+ *
+ * @param subscriptionId subscription id
+ * @param rejectedTargetFilters list of rejected target filters for the subscription request
+ * @return to sent back to the client
+ */
+ public NcmpOutEvent toNcmpOutEventForRejectedRequest(final String subscriptionId,
+ final List<String> rejectedTargetFilters) {
+ final NcmpOutEvent ncmpOutEvent = new NcmpOutEvent();
+ final Data cmSubscriptionData = new Data();
+ cmSubscriptionData.setSubscriptionId(subscriptionId);
+ cmSubscriptionData.setRejectedTargets(rejectedTargetFilters);
+ ncmpOutEvent.setData(cmSubscriptionData);
+ return ncmpOutEvent;
+ }
+
+ private void populateNcmpOutEventWithCmHandleIds(
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi,
+ final Data cmSubscriptionData) {
+
+ final List<String> acceptedCmHandleIds = new ArrayList<>();
+ final List<String> pendingCmHandleIds = new ArrayList<>();
+ final List<String> rejectedCmHandleIds = new ArrayList<>();
+
+ dmiSubscriptionsPerDmi.forEach((dmiPluginName, dmiSubscriptionDetails) -> {
+ final CmSubscriptionStatus cmSubscriptionStatus =
+ dmiSubscriptionDetails.getCmSubscriptionStatus();
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates =
+ dmiSubscriptionDetails.getDmiCmSubscriptionPredicates();
+
+ switch (cmSubscriptionStatus) {
+ case ACCEPTED -> acceptedCmHandleIds.addAll(
+ extractCmHandleIds(dmiCmSubscriptionPredicates));
+ case PENDING -> pendingCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
+ default -> rejectedCmHandleIds.addAll(extractCmHandleIds(dmiCmSubscriptionPredicates));
+ }
+ });
+
+ cmSubscriptionData.setAcceptedTargets(acceptedCmHandleIds);
+ cmSubscriptionData.setPendingTargets(pendingCmHandleIds);
+ cmSubscriptionData.setRejectedTargets(rejectedCmHandleIds);
+
+ }
+
+ private List<String> extractCmHandleIds(
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ final List<String> cmHandleIds = new ArrayList<>();
+ dmiCmSubscriptionPredicates.forEach(dmiSubscriptionPredicate -> cmHandleIds.addAll(
+ dmiSubscriptionPredicate.getTargetCmHandleIds()));
+
+ return cmHandleIds;
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java
new file mode 100644
index 0000000000..92800f4af1
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducer.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
+public class NcmpOutEventProducer {
+
+ @Value("${app.ncmp.avc.cm-subscription-ncmp-out}")
+ private String ncmpOutEventTopic;
+
+ @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
+ private Integer dmiOutEventTimeoutInMs;
+
+ private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
+ private final MappersFacade mappersFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private static final Map<String, ScheduledFuture<?>> scheduledTasksPerSubscriptionId = new ConcurrentHashMap<>();
+
+ /**
+ * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+ * Event compliant.
+ *
+ * @param subscriptionId Cm Subscription Id
+ * @param eventType Type of event
+ * @param ncmpOutEvent Cm Notification Subscription Event for the
+ * client
+ * @param isScheduledEvent Determines if the event is to be scheduled
+ * or published now
+ */
+ public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+
+ if (isScheduledEvent && !scheduledTasksPerSubscriptionId.containsKey(subscriptionId)) {
+ final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
+ scheduledTasksPerSubscriptionId.putIfAbsent(subscriptionId, scheduledFuture);
+ log.debug("Scheduled the CmNotificationSubscriptionEvent for subscriptionId : {}", subscriptionId);
+ } else {
+ cancelScheduledTaskForSubscriptionId(subscriptionId);
+ publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
+ log.info("Published CmNotificationSubscriptionEvent on demand for subscriptionId : {}", subscriptionId);
+ }
+ }
+
+ private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) {
+ final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
+ new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher,
+ jsonObjectMapper, mappersFacade, dmiCacheHandler);
+ return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void cancelScheduledTaskForSubscriptionId(final String subscriptionId) {
+
+ final ScheduledFuture<?> scheduledFuture = scheduledTasksPerSubscriptionId.get(subscriptionId);
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ scheduledTasksPerSubscriptionId.remove(subscriptionId);
+ }
+
+ }
+
+
+ private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent) {
+ final CloudEvent ncmpOutEventAsCloudEvent =
+ buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+ ncmpOutEvent);
+ eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId,
+ ncmpOutEventAsCloudEvent);
+ dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
+ }
+
+ /**
+ * Get an NCMP out event as cloud event.
+ *
+ * @param jsonObjectMapper JSON object mapper
+ * @param subscriptionId subscription id
+ * @param eventType event type
+ * @param ncmpOutEvent cm notification subscription NCMP out event
+ * @return cm notification subscription NCMP out event as cloud event
+ */
+ public static CloudEvent buildAndGetNcmpOutEventAsCloudEvent(final JsonObjectMapper jsonObjectMapper,
+ final String subscriptionId, final String eventType, final NcmpOutEvent ncmpOutEvent) {
+
+ return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withType(eventType)
+ .withSource(URI.create("NCMP")).withDataSchema(URI.create("org.onap.ncmp.cm.subscription:1.0.0"))
+ .withExtension("correlationid", subscriptionId)
+ .withData(jsonObjectMapper.asJsonBytes(ncmpOutEvent)).build();
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java
new file mode 100644
index 0000000000..5636237566
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventPublishingTask.java
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
+
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer.buildAndGetNcmpOutEventAsCloudEvent;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Slf4j
+@RequiredArgsConstructor
+public class NcmpOutEventPublishingTask implements Runnable {
+
+ private final String topicName;
+ private final String subscriptionId;
+ private final String eventType;
+ private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final JsonObjectMapper jsonObjectMapper;
+ private final MappersFacade mappersFacade;
+ private final DmiCacheHandler dmiCacheHandler;
+
+ /**
+ * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will
+ * be called after a specified delay.
+ */
+ @Override
+ public void run() {
+ final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
+ dmiCacheHandler.get(subscriptionId);
+ final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId,
+ dmiSubscriptionsPerDmi);
+ eventsPublisher.publishCloudEvent(topicName, subscriptionId,
+ buildAndGetNcmpOutEventAsCloudEvent(jsonObjectMapper, subscriptionId, eventType,
+ ncmpOutEvent));
+ dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
index e2480c5e56..c24507a1a7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceService.java
@@ -1,7 +1,6 @@
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2024 Nordix Foundation
- * Modifications Copyright (C) 2024 TechMahindra Ltd.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service;
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils;
import static org.onap.cps.spi.FetchDescendantsOption.DIRECT_CHILDREN_ONLY;
import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
@@ -43,7 +42,10 @@ import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
-public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotificationSubscriptionPersistenceService {
+public class CmSubscriptionPersistenceService {
+
+ private static final String NCMP_DATASPACE_NAME = "NCMP-Admin";
+ private static final String CM_SUBSCRIPTIONS_ANCHOR_NAME = "cm-data-subscriptions";
private static final String SUBSCRIPTION_ANCHOR_NAME = "cm-data-subscriptions";
private static final String CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE = """
@@ -64,22 +66,40 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private final CpsQueryService cpsQueryService;
private final CpsDataService cpsDataService;
- @Override
- public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
- return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
+ /**
+ * Check if we have an ongoing cm subscription based on the parameters.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @return true for ongoing cmsubscription , otherwise false
+ */
+ public boolean isOngoingCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath) {
+ return !getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
}
- @Override
+ /**
+ * Check if the subscription ID is unique against ongoing subscriptions.
+ *
+ * @param subscriptionId subscription ID
+ * @return true if subscriptionId is not used in active subscriptions, otherwise false
+ */
public boolean isUniqueSubscriptionId(final String subscriptionId) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId),
- OMIT_DESCENDANTS).isEmpty();
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted(subscriptionId), OMIT_DESCENDANTS).isEmpty();
}
- @Override
- public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath) {
+ /**
+ * Get all ongoing cm notification subscription based on the parameters.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @return collection of subscription ids of ongoing cm notification subscription
+ */
+ public Collection<String> getOngoingCmSubscriptionIds(final DatastoreType datastoreType,
+ final String cmHandleId, final String xpath) {
final String isOngoingCmSubscriptionCpsPathQuery =
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
@@ -93,24 +113,38 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
return (List<String>) existingNodes.iterator().next().getLeaves().get("subscriptionIds");
}
- @Override
- public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId) {
- final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
- cmHandleId, xpath);
+ /**
+ * Add cm notification subscription.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param newSubscriptionId subscription id to be added
+ */
+ public void addCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String newSubscriptionId) {
+ final Collection<String> subscriptionIds =
+ getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
if (subscriptionIds.isEmpty()) {
- addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, subscriptionId);
- } else if (!subscriptionIds.contains(subscriptionId)) {
- subscriptionIds.add(subscriptionId);
+ addFirstSubscriptionForDatastoreCmHandleAndXpath(datastoreType, cmHandleId, xpath, newSubscriptionId);
+ } else if (!subscriptionIds.contains(newSubscriptionId)) {
+ subscriptionIds.add(newSubscriptionId);
saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
}
}
- @Override
- public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String subscriptionId) {
- final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
- cmHandleId, xpath);
+ /**
+ * Remove cm notification Subscription.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param subscriptionId subscription id to remove
+ */
+ public void removeCmSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String subscriptionId) {
+ final Collection<String> subscriptionIds =
+ getOngoingCmSubscriptionIds(datastoreType, cmHandleId, xpath);
if (subscriptionIds.remove(subscriptionId)) {
saveSubscriptionDetails(datastoreType, cmHandleId, xpath, subscriptionIds);
log.info("There are subscribers left for the following cps path {} :",
@@ -126,16 +160,17 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
}
private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
+ final String xpath) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted(
datastoreType.getDatastoreName(), cmHandleId, escapeQuotesByDoublingThem(xpath)),
OffsetDateTime.now());
final Collection<DataNode> existingFiltersForCmHandle =
cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- DIRECT_CHILDREN_ONLY).iterator().next().getChildDataNodes();
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
+ datastoreType.getDatastoreName(), cmHandleId),
+ DIRECT_CHILDREN_ONLY).iterator().next()
+ .getChildDataNodes();
if (existingFiltersForCmHandle.isEmpty()) {
removeCmHandleFromDatastore(datastoreType.getDatastoreName(), cmHandleId);
}
@@ -143,46 +178,43 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private void removeCmHandleFromDatastore(final String datastoreName, final String cmHandleId) {
cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreName, cmHandleId), OffsetDateTime.now());
+ CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_AND_CMHANDLE.formatted(datastoreName, cmHandleId),
+ OffsetDateTime.now());
}
private boolean isFirstSubscriptionForCmHandle(final DatastoreType datastoreType, final String cmHandleId) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- OMIT_DESCENDANTS).isEmpty();
+ datastoreType.getDatastoreName(), cmHandleId), OMIT_DESCENDANTS).isEmpty();
}
private void addFirstSubscriptionForDatastoreCmHandleAndXpath(final DatastoreType datastoreType,
- final String cmHandleId,
- final String xpath,
- final String subscriptionId) {
+ final String cmHandleId, final String xpath, final String subscriptionId) {
final Collection<String> newSubscriptionList = Collections.singletonList(subscriptionId);
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, newSubscriptionList);
if (isFirstSubscriptionForCmHandle(datastoreType, cmHandleId)) {
- final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles"
- .formatted(datastoreType.getDatastoreName());
- final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}",
- cmHandleId, subscriptionDetailsAsJson);
+ final String parentXpath =
+ "/datastores/datastore[@name='%s']/cm-handles".formatted(datastoreType.getDatastoreName());
+ final String subscriptionAsJson =
+ String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":%s}]}", cmHandleId,
+ subscriptionDetailsAsJson);
cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
OffsetDateTime.now(), ContentType.JSON);
} else {
cpsDataService.saveListElements(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId),
- subscriptionDetailsAsJson, OffsetDateTime.now());
+ datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
+ OffsetDateTime.now());
}
}
- private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath,
- final Collection<String> subscriptionIds) {
+ private void saveSubscriptionDetails(final DatastoreType datastoreType, final String cmHandleId, final String xpath,
+ final Collection<String> subscriptionIds) {
final String subscriptionDetailsAsJson = getSubscriptionDetailsAsJson(xpath, subscriptionIds);
cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, CM_SUBSCRIPTIONS_ANCHOR_NAME,
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted(
- datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson,
- OffsetDateTime.now(), ContentType.JSON);
+ datastoreType.getDatastoreName(), cmHandleId), subscriptionDetailsAsJson, OffsetDateTime.now(),
+ ContentType.JSON);
}
private String getSubscriptionDetailsAsJson(final String xpath, final Collection<String> subscriptionIds) {
@@ -194,4 +226,6 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
private static String escapeQuotesByDoublingThem(final String inputXpath) {
return inputXpath.replace("'", "''");
}
+
}
+
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
index 07395cf5bc..07395cf5bc 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDeltaSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDeltaSpec.groovy
deleted file mode 100644
index 89ccc7e7dc..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDeltaSpec.groovy
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
-
-import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
-import spock.lang.Specification
-
-class CmNotificationSubscriptionDeltaSpec extends Specification {
-
- def mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
- def objectUnderTest = new CmNotificationSubscriptionDelta(mockCmNotificationSubscriptionPersistenceService)
-
- def 'Find Delta of given list of predicates'() {
- given: 'A list of predicates'
- def predicateList = [new DmiCmNotificationSubscriptionPredicate(['ch-1','ch-2'].toSet(), DatastoreType.PASSTHROUGH_OPERATIONAL, ['a/1/','b/2'].toSet())]
- and: '3 positive responses and 1 negative.'
- mockCmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
- mockCmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_OPERATIONAL, 'ch-1', 'b/2') >>> true
- mockCmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_OPERATIONAL, 'ch-2', 'a/1/') >>> true
- mockCmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_OPERATIONAL, 'ch-2', 'b/2') >>> false
- when: 'getDelta is called'
- def result = objectUnderTest.getDelta(predicateList)
- then: 'verify correct delta is returned'
- assert result.size() == 1
- assert result[0].targetCmHandleIds[0] == 'ch-2'
- assert result[0].xpaths[0] == 'b/2'
-
- }
-
- def 'Find Delta of given list of predicates when it is an ongoing Cm Subscription'() {
- given: 'A list of predicates'
- def predicateList = [new DmiCmNotificationSubscriptionPredicate(['ch-1'].toSet(), DatastoreType.PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())]
- and: 'its already present'
- mockCmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
- when: 'getDelta is called'
- def result = objectUnderTest.getDelta(predicateList)
- then: 'verify correct delta is returned'
- assert result.size() == 0
- }
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy
deleted file mode 100644
index 1fb5837eb3..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpOutEventProducerSpec.groovy
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import org.onap.cps.events.EventsPublisher
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.Data
-import org.onap.cps.ncmp.utils.events.CloudEventMapper
-import org.onap.cps.utils.JsonObjectMapper
-import spock.lang.Specification
-
-class CmNotificationSubscriptionNcmpOutEventProducerSpec extends Specification {
-
- def mockEventsPublisher = Mock(EventsPublisher)
- def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def mockCmNotificationSubscriptionMappersHandler = Mock(CmNotificationSubscriptionMappersHandler)
- def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
-
- def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper,
- mockCmNotificationSubscriptionMappersHandler, mockDmiCmNotificationSubscriptionCacheHandler)
-
- def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
- given: 'a cm subscription response for the client'
- def subscriptionId = 'test-subscription-id-2'
- def eventType = 'subscriptionCreateResponse'
- def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
- and: 'also we have target topic for publishing to client'
- objectUnderTest.cmNotificationSubscriptionNcmpOutEventTopic = 'client-test-topic'
- and: 'a deadline to an event'
- objectUnderTest.cmNotificationSubscriptionDmiOutEventTimeoutInMs = 1000
- when: 'the event is published'
- objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, eventPublishingTaskToBeScheduled)
- then: 'we conditionally wait for a while'
- Thread.sleep(delayInMs)
- then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
- args ->
- {
- assert args[0] == 'client-test-topic'
- assert args[1] == subscriptionId
- def cmNotificationSubscriptionNcmpOutEventAsCloudEvent = (args[2] as CloudEvent)
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionNcmpOutEventAsCloudEvent, CmNotificationSubscriptionNcmpOutEvent) == cmNotificationSubscriptionNcmpOutEvent
- }
- }
- where: 'following scenarios are considered'
- scenario | delayInMs | eventPublishingTaskToBeScheduled
- 'publish event now' | 0 | false
- 'schedule and publish after the configured time ' | 1500 | true
- }
-
- def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() {
- given: 'a cm subscription response for the client'
- def subscriptionId = 'test-subscription-id-3'
- def eventType = 'subscriptionCreateResponse'
- def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
- and: 'also we have target topic for publishing to client'
- objectUnderTest.cmNotificationSubscriptionNcmpOutEventTopic = 'client-test-topic'
- and: 'a deadline to an event'
- objectUnderTest.cmNotificationSubscriptionDmiOutEventTimeoutInMs = 1000
- when: 'the event is scheduled to be published'
- objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, true)
- then: 'we wait for 10ms and then we receive response from DMI'
- Thread.sleep(10)
- and: 'we receive response from DMI so we publish the message on demand'
- objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId, eventType, cmNotificationSubscriptionNcmpOutEvent, false)
- then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
- args ->
- {
- assert args[0] == 'client-test-topic'
- assert args[1] == subscriptionId
- def cmNotificationSubscriptionNcmpOutEventAsCloudEvent = (args[2] as CloudEvent)
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
- assert cmNotificationSubscriptionNcmpOutEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionNcmpOutEventAsCloudEvent, CmNotificationSubscriptionNcmpOutEvent) == cmNotificationSubscriptionNcmpOutEvent
- }
- }
- then: 'the cache handler is called once to remove accepted and rejected entries in cache'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries(subscriptionId)
- }
-
-
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapperSpec.groovy
deleted file mode 100644
index 179cf361d9..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionNcmpOutEventMapperSpec.groovy
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper
-
-import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate
-import spock.lang.Specification
-
-class CmNotificationSubscriptionNcmpOutEventMapperSpec extends Specification {
-
- static Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsMap
-
- def objectUnderTest = new CmNotificationSubscriptionNcmpOutEventMapper()
-
- def setup() {
- def dmiCmNotificationSubscriptionPredicateA = new DmiCmNotificationSubscriptionPredicate(['ch-A'] as Set, DatastoreType.PASSTHROUGH_RUNNING, ['/a'] as Set)
- def dmiCmNotificationSubscriptionPredicateB = new DmiCmNotificationSubscriptionPredicate(['ch-B'] as Set, DatastoreType.PASSTHROUGH_OPERATIONAL, ['/b'] as Set)
- def dmiCmNotificationSubscriptionPredicateC = new DmiCmNotificationSubscriptionPredicate(['ch-C'] as Set, DatastoreType.PASSTHROUGH_OPERATIONAL, ['/c'] as Set)
- dmiCmNotificationSubscriptionDetailsMap = ['dmi-1': new DmiCmNotificationSubscriptionDetails([dmiCmNotificationSubscriptionPredicateA], CmNotificationSubscriptionStatus.PENDING),
- 'dmi-2': new DmiCmNotificationSubscriptionDetails([dmiCmNotificationSubscriptionPredicateB], CmNotificationSubscriptionStatus.ACCEPTED),
- 'dmi-3': new DmiCmNotificationSubscriptionDetails([dmiCmNotificationSubscriptionPredicateC], CmNotificationSubscriptionStatus.REJECTED)
- ]
- }
-
- def 'Check for Cm Notification Subscription Outgoing event mapping'() {
- when: 'we try to map the event to send it to client'
- def result = objectUnderTest.toCmNotificationSubscriptionNcmpOutEvent('test-subscription', dmiCmNotificationSubscriptionDetailsMap)
- then: 'event is mapped correctly for the subscription'
- result.data.subscriptionId == 'test-subscription'
- and: 'the cm handle ids are part of correct list'
- result.data.pendingTargets == ['ch-A']
- result.data.acceptedTargets == ['ch-B']
- result.data.rejectedTargets == ['ch-C']
- }
-
- def 'Check for Cm Notification Rejected Subscription Outgoing event mapping'() {
- when: 'we try to map the event to send it to client'
- def result = objectUnderTest.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest('test-subscription', ['ch-1', 'ch-2'])
- then: 'event is mapped correctly for the subscription id'
- result.data.subscriptionId == 'test-subscription'
- and: 'the cm handle ids are part of correct list'
- result.data.withRejectedTargets(['ch-1', 'ch-2'])
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy
deleted file mode 100644
index 55a817ed67..0000000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionHandlerServiceImplSpec.groovy
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an 'AS IS' BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionDelta
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
-import spock.lang.Specification
-
-class CmNotificationSubscriptionHandlerServiceImplSpec extends Specification{
-
- def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService);
- def mockCmNotificationSubscriptionDelta = Mock(CmNotificationSubscriptionDelta);
- def mockCmNotificationSubscriptionMappersHandler = Mock(CmNotificationSubscriptionMappersHandler);
- def mockCmNotificationSubscriptionEventsHandler = Mock(CmNotificationSubscriptionEventsHandler);
- def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler);
-
- def objectUnderTest = new CmNotificationSubscriptionHandlerServiceImpl(mockCmNotificationSubscriptionPersistenceService,
- mockCmNotificationSubscriptionDelta, mockCmNotificationSubscriptionMappersHandler,
- mockCmNotificationSubscriptionEventsHandler, mockDmiCmNotificationSubscriptionCacheHandler)
-
- def testSubscriptionDetailsMap = ["dmi-1":new DmiCmNotificationSubscriptionDetails([], CmNotificationSubscriptionStatus.PENDING)]
-
- def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() {
- given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
- def testListOfDeltaPredicates = [new DmiCmNotificationSubscriptionPredicate(['ch1'].toSet(), DatastoreType.PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())]
- mockCmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.get("test-id") >> testSubscriptionDetailsMap
- and: 'the delta predicates is returned'
- 1 * mockCmNotificationSubscriptionDelta.getDelta(_) >> testListOfDeltaPredicates
- and: 'the DMI in event mapper returns cm notification subscription event'
- def testDmiInEvent = new CmNotificationSubscriptionDmiInEvent()
- 1 * mockCmNotificationSubscriptionMappersHandler
- .toCmNotificationSubscriptionDmiInEvent(testListOfDeltaPredicates) >> testDmiInEvent
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id',_)
- and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
- testSubscriptionDetailsMap.size() * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(
- "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(
- "test-id", "subscriptionCreateResponse", null, true)
- }
-
- def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() {
- given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
- def noDeltaPredicates = []
- mockCmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.get("test-id") >> testSubscriptionDetailsMap
- and: 'the delta predicates is returned'
- 1 * mockCmNotificationSubscriptionDelta.getDelta(_) >> noDeltaPredicates
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest('test-id', noDeltaPredicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id', _)
- and: 'the subscription details are updated in the cache'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('test-id', _, CmNotificationSubscriptionStatus.ACCEPTED)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(
- "test-id", "subscriptionCreateResponse", null, true)
- }
-
- def 'Consume valid and but non-unique CmNotificationSubscription create message'() {
- given: 'a cmNotificationSubscriptionNcmp in event'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
- mockCmNotificationSubscriptionPersistenceService.isUniqueSubscriptionId('test-id') >> false
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the NCMP out in event mapper returns an event for rejected request'
- def testNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent()
- 1 * mockCmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
- "test-id",_) >> testNcmpOutEvent
- when: 'the valid but non-unique event is consumed'
- objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
- then: 'the events handler method to publish DMI event is never called'
- 0 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(_,_,_,_)
- and: 'the events handler method to publish NCMP out event is called once'
- 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(
- 'test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
- }
-
- def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
- given: 'a cmNotificationSubscriptionNcmp in event for delete'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
- and: 'relevant details is extracted from the event'
- def subscriptionId = testEventConsumed.getData().getSubscriptionId()
- def predicates = testEventConsumed.getData().getPredicates()
- and: 'the cache handler returns for relevant subscription id'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.get('test-id') >> testSubscriptionDetailsMap
- when: 'the valid and unique event is consumed'
- objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates)
- then: 'the subscription cache handler is called once'
- 1 * mockDmiCmNotificationSubscriptionCacheHandler.add('test-id', predicates)
- and: 'the mapper handler to get DMI in event is called once'
- 1 * mockCmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionDmiInEvent(_)
- and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
- testSubscriptionDetailsMap.size() * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionDmiInEvent(
- 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _)
- and: 'we schedule to send the response after configured time from the cache'
- 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(
- 'test-id', 'subscriptionDeleteResponse', null, true)
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacadeSpec.groovy
index 788a7a7da5..bc2df10ce3 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionEventsHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/EventsFacadeSpec.groovy
@@ -18,41 +18,41 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionNcmpOutEventProducer
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionDmiInEventProducer
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
-import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventProducer
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
import spock.lang.Specification
-class CmNotificationSubscriptionEventsHandlerSpec extends Specification {
+class EventsFacadeSpec extends Specification {
- def mockCmNotificationSubscriptionNcmpOutEventProducer = Mock(CmNotificationSubscriptionNcmpOutEventProducer)
- def mockCmNotificationSubscriptionDmiInEventProducer = Mock(CmNotificationSubscriptionDmiInEventProducer)
+ def mockCmNotificationSubscriptionNcmpOutEventProducer = Mock(NcmpOutEventProducer)
+ def mockCmNotificationSubscriptionDmiInEventProducer = Mock(DmiInEventProducer)
- def objectUnderTest = new CmNotificationSubscriptionEventsHandler(mockCmNotificationSubscriptionNcmpOutEventProducer,
+ def objectUnderTest = new EventsFacade(mockCmNotificationSubscriptionNcmpOutEventProducer,
mockCmNotificationSubscriptionDmiInEventProducer)
def 'Publish cm notification subscription ncmp out event'() {
given: 'an ncmp out event'
- def cmNotificationSubscriptionNcmpOutEvent = new CmNotificationSubscriptionNcmpOutEvent()
+ def ncmpOutEvent = new NcmpOutEvent()
when: 'the method to publish cm notification subscription ncmp out event is called'
- objectUnderTest.publishCmNotificationSubscriptionNcmpOutEvent("some-id",
- "some-event", cmNotificationSubscriptionNcmpOutEvent, true)
+ objectUnderTest.publishNcmpOutEvent("some-id",
+ "some-event", ncmpOutEvent, true)
then: 'the parameters is delegated to the correct method once'
- 1 * mockCmNotificationSubscriptionNcmpOutEventProducer.publishCmNotificationSubscriptionNcmpOutEvent(
- "some-id", "some-event", cmNotificationSubscriptionNcmpOutEvent, true)
+ 1 * mockCmNotificationSubscriptionNcmpOutEventProducer.publishNcmpOutEvent(
+ "some-id", "some-event", ncmpOutEvent, true)
}
def 'Publish cm notification subscription dmi in event'() {
given: 'a dmi in event'
- def cmNotificationSubscriptionDmiInEvent = new CmNotificationSubscriptionDmiInEvent()
+ def dmiInEvent = new DmiInEvent()
when: 'the method to publish cm notification subscription ncmp out event is called'
- objectUnderTest.publishCmNotificationSubscriptionDmiInEvent("some-id",
- "some-dmi", "some-event", cmNotificationSubscriptionDmiInEvent)
+ objectUnderTest.publishDmiInEvent("some-id",
+ "some-dmi", "some-event", dmiInEvent)
then: 'the parameters is delegated to the correct method once'
- 1 * mockCmNotificationSubscriptionDmiInEventProducer.publishCmNotificationSubscriptionDmiInEvent("some-id",
- "some-dmi", "some-event", cmNotificationSubscriptionDmiInEvent)
+ 1 * mockCmNotificationSubscriptionDmiInEventProducer.publishDmiInEvent("some-id",
+ "some-dmi", "some-event", dmiInEvent)
}
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacadeSpec.groovy
index bdc54bd1ee..79b9ad578f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionMappersHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/MappersFacadeSpec.groovy
@@ -18,36 +18,37 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionDmiInEventMapper
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper.CmNotificationSubscriptionNcmpOutEventMapper
+
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi.DmiInEventMapper
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper
import spock.lang.Specification
-class CmNotificationSubscriptionMappersHandlerSpec extends Specification{
+class MappersFacadeSpec extends Specification{
- def mockCmNotificationDmiInEventMapper = Mock(CmNotificationSubscriptionDmiInEventMapper)
- def mockCmNotificationNcmpOutEventMapper = Mock(CmNotificationSubscriptionNcmpOutEventMapper)
+ def mockCmNotificationDmiInEventMapper = Mock(DmiInEventMapper)
+ def mockCmNotificationNcmpOutEventMapper = Mock(NcmpOutEventMapper)
- def objectUnderTest = new CmNotificationSubscriptionMappersHandler(mockCmNotificationDmiInEventMapper,
+ def objectUnderTest = new MappersFacade(mockCmNotificationDmiInEventMapper,
mockCmNotificationNcmpOutEventMapper)
def 'Get cm notification subscription DMI in event'() {
given: 'a list of predicates'
def testListOfPredicates = []
when: 'method to create a cm notification subscription dmi in event is called with predicates'
- objectUnderTest.toCmNotificationSubscriptionDmiInEvent(testListOfPredicates)
+ objectUnderTest.toDmiInEvent(testListOfPredicates)
then: 'the parameters is delegated to the correct dmi in event mapper method'
- 1 * mockCmNotificationDmiInEventMapper.toCmNotificationSubscriptionDmiInEvent(testListOfPredicates)
+ 1 * mockCmNotificationDmiInEventMapper.toDmiInEvent(testListOfPredicates)
}
def 'Get cm notification subscription ncmp out event'() {
given: 'a subscription details map'
def testSubscriptionDetailsMap = [:]
when: 'method to create cm notification subscription ncmp out event is called with the following parameters'
- objectUnderTest.toCmNotificationSubscriptionNcmpOutEvent("test-id", testSubscriptionDetailsMap)
+ objectUnderTest.toNcmpOutEvent("test-id", testSubscriptionDetailsMap)
then: 'the parameters is delegated to the correct ncmp out event mapper method'
- 1 * mockCmNotificationNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEvent("test-id",
+ 1 * mockCmNotificationNcmpOutEventMapper.toNcmpOutEvent("test-id",
testSubscriptionDetailsMap)
}
@@ -55,10 +56,10 @@ class CmNotificationSubscriptionMappersHandlerSpec extends Specification{
given: 'a list of target filters'
def testRejectedTargetFilters = []
when: 'method to create cm notification subscription ncmp out event is called with the following parameters'
- objectUnderTest.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
+ objectUnderTest.toNcmpOutEventForRejectedRequest(
"test-id", testRejectedTargetFilters)
then: 'the parameters is delegated to the correct ncmp out event mapper method'
- 1 * mockCmNotificationNcmpOutEventMapper.toCmNotificationSubscriptionNcmpOutEventForRejectedRequest(
+ 1 * mockCmNotificationNcmpOutEventMapper.toNcmpOutEventForRejectedRequest(
"test-id", testRejectedTargetFilters)
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfigSpec.groovy
index adb1dfda24..915bccb53d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/CmNotificationSubscriptionCacheConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/CmSubscriptionConfigSpec.groovy
@@ -18,23 +18,23 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.config.embeddedcache
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache
import com.hazelcast.core.Hazelcast
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import spock.lang.Specification
-@SpringBootTest(classes = [CmNotificationSubscriptionCacheConfig])
-class CmNotificationSubscriptionCacheConfigSpec extends Specification {
+@SpringBootTest(classes = [CmSubscriptionConfig])
+class CmSubscriptionConfigSpec extends Specification {
@Autowired
- IMap<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
+ IMap<String, Map<String, DmiCmSubscriptionDetails>> cmNotificationSubscriptionCache;
def 'Embedded (hazelcast) cache for Cm Notification Subscription Cache.'() {
expect: 'system is able to create an instance of the Cm Notification Subscription Cache'
@@ -49,15 +49,15 @@ class CmNotificationSubscriptionCacheConfigSpec extends Specification {
given: 'a cm subscription properties'
def subscriptionId = 'sub123'
def dmiPluginName = 'dummydmi'
- def cmSubscriptionPredicate = new DmiCmNotificationSubscriptionPredicate(['cmhandle1', 'cmhandle2'].toSet(), DatastoreType.PASSTHROUGH_RUNNING, ['/a/b/c'].toSet())
- def cmSubscriptionCacheObject = new DmiCmNotificationSubscriptionDetails([cmSubscriptionPredicate], CmNotificationSubscriptionStatus.PENDING)
+ def cmSubscriptionPredicates = new DmiCmSubscriptionPredicate(['cmhandle1', 'cmhandle2'].toSet(), DatastoreType.PASSTHROUGH_RUNNING, ['/a/b/c'].toSet())
+ def cmSubscriptionCacheObject = new DmiCmSubscriptionDetails([cmSubscriptionPredicates], CmSubscriptionStatus.PENDING)
when: 'the cache is populated'
cmNotificationSubscriptionCache.put(subscriptionId, [(dmiPluginName): cmSubscriptionCacheObject])
then: 'the values are present in memory'
assert cmNotificationSubscriptionCache.get(subscriptionId) != null
and: 'properties match'
assert dmiPluginName == cmNotificationSubscriptionCache.get(subscriptionId).keySet()[0]
- assert cmSubscriptionCacheObject.cmNotificationSubscriptionStatus == cmNotificationSubscriptionCache.get(subscriptionId).values().cmNotificationSubscriptionStatus[0]
- assert cmSubscriptionCacheObject.dmiCmNotificationSubscriptionPredicates[0].targetCmHandleIds == cmNotificationSubscriptionCache.get(subscriptionId).values().dmiCmNotificationSubscriptionPredicates[0].targetCmHandleIds[0]
+ assert cmSubscriptionCacheObject.cmSubscriptionStatus == cmNotificationSubscriptionCache.get(subscriptionId).values().cmSubscriptionStatus[0]
+ assert cmSubscriptionCacheObject.dmiCmSubscriptionPredicates[0].targetCmHandleIds == cmNotificationSubscriptionCache.get(subscriptionId).values().dmiCmSubscriptionPredicates[0].targetCmHandleIds[0]
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
index 7d4bd73aa1..b335843bb3 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cache/DmiCacheHandlerSpec.groovy
@@ -18,17 +18,17 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cache
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import org.onap.cps.ncmp.utils.TestUtils
@@ -40,7 +40,7 @@ import org.springframework.boot.test.context.SpringBootTest
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
+class DmiCacheHandlerSpec extends MessagingBaseSpec {
@Autowired
JsonObjectMapper jsonObjectMapper
@@ -49,12 +49,12 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
@SpringBean
- CmNotificationSubscriptionPersistenceService mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
+ CmSubscriptionPersistenceService mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
def testCache = [:]
- def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(mockCmNotificationSubscriptionPersistenceService, testCache, mockInventoryPersistence)
+ def objectUnderTest = new DmiCacheHandler(mockCmSubscriptionPersistenceService, testCache, mockInventoryPersistence)
- CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent
+ NcmpInEvent ncmpInEvent
def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
def yangModelCmHandle2 = new YangModelCmHandle(id:'ch2',dmiServiceName:'dmi-2')
def yangModelCmHandle3 = new YangModelCmHandle(id:'ch3',dmiServiceName:'dmi-1')
@@ -67,9 +67,9 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def 'Load CM subscription event to cache'() {
given: 'a valid subscription event with Id'
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def predicates = ncmpInEvent.getData().getPredicates()
when: 'a valid event object loaded in cache'
objectUnderTest.add(subscriptionId, predicates)
then: 'the cache contains the correct entry with #subscriptionId subscription ID'
@@ -89,19 +89,19 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
given: 'a map as the value for cache entry for some-id'
def testMap = [:]
testMap.put("dmi-1",
- new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.ACCEPTED))
+ new DmiCmSubscriptionDetails([],CmSubscriptionStatus.ACCEPTED))
testMap.put("dmi-2",
- new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.REJECTED))
+ new DmiCmSubscriptionDetails([],CmSubscriptionStatus.REJECTED))
testMap.put("dmi-3",
- new DmiCmNotificationSubscriptionDetails([],CmNotificationSubscriptionStatus.PENDING))
+ new DmiCmSubscriptionDetails([],CmSubscriptionStatus.PENDING))
testCache.put("test-id", testMap)
assert testCache.get("test-id").size() == 3
when: 'the method to remove accepted and rejected entries for test-id is called'
- objectUnderTest.removeAcceptedAndRejectedDmiCmNotificationSubscriptionEntries("test-id")
+ objectUnderTest.removeAcceptedAndRejectedDmiSubscriptionEntries("test-id")
then: 'all entries with status accepted/rejected are no longer present for test-id'
testCache.get("test-id").each { key, testResultMap ->
- assert testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.ACCEPTED
- || testResultMap.cmNotificationSubscriptionStatus != CmNotificationSubscriptionStatus.REJECTED
+ assert testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.ACCEPTED
+ || testResultMap.cmSubscriptionStatus != CmSubscriptionStatus.REJECTED
}
and: 'the size of the map for cache entry test-id is as expected'
assert testCache.get("test-id").size() == 1
@@ -109,9 +109,9 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def 'Create map for DMI cm notification subscription per DMI service name'() {
given: 'list of predicates from the create subscription event'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def predicates = ncmpInEvent.getData().getPredicates()
when: 'method to create map of DMI cm notification subscription per DMI service name is called'
- def result = objectUnderTest.createDmiCmNotificationSubscriptionsPerDmi(predicates)
+ def result = objectUnderTest.createDmiSubscriptionsPerDmi(predicates)
then: 'the result size of resulting map is correct to the number of DMIs'
assert result.size() == 2
and: 'the cache objects per DMI exists'
@@ -120,28 +120,28 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
assert resultMapForDmi1 != null
assert resultMapForDmi2 != null
and: 'the size of predicates in each object is correct'
- assert resultMapForDmi1.dmiCmNotificationSubscriptionPredicates.size() == 2
- assert resultMapForDmi2.dmiCmNotificationSubscriptionPredicates.size() == 2
+ assert resultMapForDmi1.dmiCmSubscriptionPredicates.size() == 2
+ assert resultMapForDmi2.dmiCmSubscriptionPredicates.size() == 2
and: 'the subscription status in each object is correct'
- assert resultMapForDmi1.cmNotificationSubscriptionStatus.toString() == 'PENDING'
- assert resultMapForDmi2.cmNotificationSubscriptionStatus.toString() == 'PENDING'
+ assert resultMapForDmi1.cmSubscriptionStatus.toString() == 'PENDING'
+ assert resultMapForDmi2.cmSubscriptionStatus.toString() == 'PENDING'
and: 'the target cmHandles for each predicate is correct'
- assert resultMapForDmi1.dmiCmNotificationSubscriptionPredicates[0].targetCmHandleIds == ['ch1'].toSet()
- assert resultMapForDmi1.dmiCmNotificationSubscriptionPredicates[1].targetCmHandleIds == ['ch3'].toSet()
+ assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch1'].toSet()
+ assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch3'].toSet()
- assert resultMapForDmi2.dmiCmNotificationSubscriptionPredicates[0].targetCmHandleIds == ['ch2'].toSet()
- assert resultMapForDmi2.dmiCmNotificationSubscriptionPredicates[1].targetCmHandleIds == ['ch4'].toSet()
+ assert resultMapForDmi2.dmiCmSubscriptionPredicates[0].targetCmHandleIds == ['ch2'].toSet()
+ assert resultMapForDmi2.dmiCmSubscriptionPredicates[1].targetCmHandleIds == ['ch4'].toSet()
and: 'the list of xpath for each is correct'
- assert resultMapForDmi1.dmiCmNotificationSubscriptionPredicates[0].xpaths
- && resultMapForDmi2.dmiCmNotificationSubscriptionPredicates[0].xpaths == ['/x1/y1','x2/y2'].toSet()
+ assert resultMapForDmi1.dmiCmSubscriptionPredicates[0].xpaths
+ && resultMapForDmi2.dmiCmSubscriptionPredicates[0].xpaths == ['/x1/y1', 'x2/y2'].toSet()
- assert resultMapForDmi1.dmiCmNotificationSubscriptionPredicates[1].xpaths
- && resultMapForDmi2.dmiCmNotificationSubscriptionPredicates[1].xpaths == ['/x3/y3','x4/y4'].toSet()
+ assert resultMapForDmi1.dmiCmSubscriptionPredicates[1].xpaths
+ && resultMapForDmi2.dmiCmSubscriptionPredicates[1].xpaths == ['/x3/y3', 'x4/y4'].toSet()
}
def 'Get map for cm handle IDs by DMI service name'() {
given: 'the predicate from the test request CM subscription event'
- def targetFilter = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates().get(0).getTargetFilter()
+ def targetFilter = ncmpInEvent.getData().getPredicates().get(0).getTargetFilter()
when: 'the method to group all target CM handles by DMI service name is called'
def mapOfCMHandleIDsByDmi = objectUnderTest.groupTargetCmHandleIdsByDmi(targetFilter)
then: 'the size of the resulting map is correct'
@@ -153,41 +153,41 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def 'Update subscription status in cache per DMI service name'() {
given: 'populated cache'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ def predicates = ncmpInEvent.getData().getPredicates()
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription status per dmi is updated in cache'
- objectUnderTest.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmNotificationSubscriptionStatus.ACCEPTED)
+ objectUnderTest.updateDmiSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmSubscriptionStatus.ACCEPTED)
then: 'verify status has been updated in cache'
def predicate = testCache.get(subscriptionId)
- assert predicate.get('dmi-1').cmNotificationSubscriptionStatus == CmNotificationSubscriptionStatus.ACCEPTED
+ assert predicate.get('dmi-1').cmSubscriptionStatus == CmSubscriptionStatus.ACCEPTED
}
def 'Persist Cache into database per dmi'() {
given: 'populated cache'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ def predicates = ncmpInEvent.getData().getPredicates()
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
- 4 * mockCmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(_,_,_,subscriptionId)
+ 4 * mockCmSubscriptionPersistenceService.addCmSubscription(_,_,_,subscriptionId)
}
def 'Remove subscription from database per dmi'() {
given: 'populated cache'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ def predicates = ncmpInEvent.getData().getPredicates()
+ def subscriptionId = ncmpInEvent.getData().getSubscriptionId()
objectUnderTest.add(subscriptionId, predicates)
when: 'subscription is persisted in database'
objectUnderTest.removeFromDatabasePerDmi(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
- 4 * mockCmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(_,_,_,subscriptionId)
+ 4 * mockCmSubscriptionPersistenceService.removeCmSubscription(_,_,_,subscriptionId)
}
def setUpTestEvent(){
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
.withId('subscriptionCreated')
@@ -197,7 +197,7 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
def cloudEvent = consumerRecord.value()
- cmNotificationSubscriptionNcmpInEvent = toTargetEvent(cloudEvent, CmNotificationSubscriptionNcmpInEvent.class);
+ ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class);
}
def initialiseMockInventoryPersistenceResponses(){
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
index 0f5d4fe5af..98cc383e7f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.avc
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.cmavc
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
@@ -42,16 +42,16 @@ import java.time.Duration
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventsPublisher, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
-class AvcEventConsumerSpec extends MessagingBaseSpec {
+class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher)
+ CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapperSpec.groovy
index ebbaf95578..62d1572502 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/mapper/CmNotificationSubscriptionDmiInEventMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventMapperSpec.groovy
@@ -18,10 +18,10 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.mapper
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
import spock.lang.Specification
@@ -29,11 +29,11 @@ import spock.lang.Specification
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
-class CmNotificationSubscriptionDmiInEventMapperSpec extends Specification {
+class DmiInEventMapperSpec extends Specification {
def mockInventoryPersistence = Mock(InventoryPersistence)
- def objectUnderTest = new CmNotificationSubscriptionDmiInEventMapper(mockInventoryPersistence)
+ def objectUnderTest = new DmiInEventMapper(mockInventoryPersistence)
def setup() {
def yangModelCmHandles = [new YangModelCmHandle(id: 'ch-1', dmiProperties: [new YangModelCmHandle.Property('k1', 'v1')], publicProperties: []),
@@ -43,10 +43,10 @@ class CmNotificationSubscriptionDmiInEventMapperSpec extends Specification {
def 'Check for Cm Notification Subscription DMI In Event mapping'() {
given: 'a collection of cm subscription predicates'
- def dmiCmNotificationSubscriptionPredicates = [new DmiCmNotificationSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_RUNNING, ['/ch-1'].toSet()),
- new DmiCmNotificationSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())]
+ def dmiSubscriptionPredicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_RUNNING, ['/ch-1'].toSet()),
+ new DmiCmSubscriptionPredicate(['ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['/ch-2'].toSet())]
when: 'we try to map the values'
- def result = objectUnderTest.toCmNotificationSubscriptionDmiInEvent(dmiCmNotificationSubscriptionPredicates)
+ def result = objectUnderTest.toDmiInEvent(dmiSubscriptionPredicates)
then: 'it contains correct cm notification subscription cmhandle object'
assert result.data.cmHandles.cmhandleId.containsAll(['ch-1', 'ch-2'])
assert result.data.cmHandles.privateProperties.containsAll([['k1': 'v1'], ['k2': 'v2']])
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiInEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy
index 253763b13b..34fa4549f5 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiInEventProducerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiInEventProducerSpec.groovy
@@ -18,47 +18,46 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import org.onap.cps.events.EventsPublisher
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.producer.CmNotificationSubscriptionDmiInEventProducer
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmHandle
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.Data
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
import org.onap.cps.ncmp.utils.events.CloudEventMapper
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
-class CmNotificationSubscriptionDmiInEventProducerSpec extends Specification {
+class DmiInEventProducerSpec extends Specification {
def mockEventsPublisher = Mock(EventsPublisher)
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def objectUnderTest = new CmNotificationSubscriptionDmiInEventProducer(mockEventsPublisher, jsonObjectMapper)
+ def objectUnderTest = new DmiInEventProducer(mockEventsPublisher, jsonObjectMapper)
def 'Create and Publish Cm Notification Subscription DMI In Event'() {
given: 'a cm subscription for a dmi plugin'
def subscriptionId = 'test-subscription-id'
def dmiPluginName = 'test-dmiplugin'
def eventType = 'subscriptionCreateRequest'
- def cmNotificationSubscriptionDmiInEvent = new CmNotificationSubscriptionDmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
+ def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
and: 'also we have target topic for dmiPlugin'
- objectUnderTest.cmNotificationSubscriptionDmiInEventTopic = 'dmiplugin-test-topic'
+ objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic'
when: 'the event is published'
- objectUnderTest.publishCmNotificationSubscriptionDmiInEvent(subscriptionId, dmiPluginName, eventType, cmNotificationSubscriptionDmiInEvent)
+ objectUnderTest.publishDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
then: 'the event contains the required attributes'
1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
args ->
{
assert args[0] == 'dmiplugin-test-topic'
assert args[1] == subscriptionId
- def cmNotificationSubscriptionDmiInEventAsCloudEvent = (args[2] as CloudEvent)
- assert cmNotificationSubscriptionDmiInEventAsCloudEvent.getExtension('correlationid') == subscriptionId + '#' + dmiPluginName
- assert cmNotificationSubscriptionDmiInEventAsCloudEvent.type == 'subscriptionCreateRequest'
- assert cmNotificationSubscriptionDmiInEventAsCloudEvent.source.toString() == 'NCMP'
- assert CloudEventMapper.toTargetEvent(cmNotificationSubscriptionDmiInEventAsCloudEvent, CmNotificationSubscriptionDmiInEvent) == cmNotificationSubscriptionDmiInEvent
+ def dmiInEventAsCloudEvent = (args[2] as CloudEvent)
+ assert dmiInEventAsCloudEvent.getExtension('correlationid') == subscriptionId + '#' + dmiPluginName
+ assert dmiInEventAsCloudEvent.type == 'subscriptionCreateRequest'
+ assert dmiInEventAsCloudEvent.source.toString() == 'NCMP'
+ assert CloudEventMapper.toTargetEvent(dmiInEventAsCloudEvent, DmiInEvent) == dmiInEvent
}
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
index 9b0a48d93e..44e24042a8 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/dmi/DmiOutEventConsumerSpec.groovy
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
@@ -28,19 +28,23 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer.CmNotificationSubscriptionDmiOutEventConsumer
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.REJECTED
+
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
+class DmiOutEventConsumerSpec extends MessagingBaseSpec {
@Autowired
JsonObjectMapper jsonObjectMapper
@@ -48,27 +52,27 @@ class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpe
@Autowired
ObjectMapper objectMapper
- def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
- def mockCmNotificationSubscriptionEventsHandler = Mock(CmNotificationSubscriptionEventsHandler)
- def mockCmNotificationSubscriptionMappersHandler = Mock(CmNotificationSubscriptionMappersHandler)
+ def mockDmiCacheHandler = Mock(DmiCacheHandler)
+ def mockEventsHandler = Mock(EventsFacade)
+ def mockMappersHandler = Mock(MappersFacade)
- def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer(mockDmiCmNotificationSubscriptionCacheHandler, mockCmNotificationSubscriptionEventsHandler, mockCmNotificationSubscriptionMappersHandler)
+ def objectUnderTest = new DmiOutEventConsumer(mockDmiCacheHandler, mockEventsHandler, mockMappersHandler)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
- ((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).addAppender(logger)
+ ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).addAppender(logger)
logger.start()
}
void cleanup() {
- ((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(DmiOutEventConsumer.class)).detachAndStopAllAppenders()
}
def 'Consume valid CM Subscription response from DMI Plugin'() {
given: 'a cmsubscription event'
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionDmiOutEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionDmiOutEvent.class)
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiOutEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
.withId('random-uuid')
@@ -77,7 +81,7 @@ class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpe
.withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
when: 'the valid event is consumed'
- objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+ objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'an event is logged with level INFO'
def loggingEvent = getLoggingEvent()
assert loggingEvent.level == Level.INFO
@@ -88,28 +92,28 @@ class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpe
def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
given: 'a cmNotificationSubscription event'
def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
- def cmNotificationSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent().withData(dmiOutEventData)
+ def dmiOutEvent = new DmiOutEvent().withData(dmiOutEventData)
def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(cmNotificationSubscriptionDmiOutEvent))
+ .withData(objectMapper.writeValueAsBytes(dmiOutEvent))
.withId('random-uuid')
.withType('subscriptionCreateResponse')
.withSource(URI.create('test-dmi-plugin-name'))
.withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
when: 'the event is consumed'
- objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+ objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'correct number of calls to cache'
- expectedCacheCalls * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ expectedCacheCalls * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
and: 'correct number of calls to persist cache'
- expectedPersistenceCalls * mockDmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+ expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
and: 'correct number of calls to map the ncmp out event'
- 1 * mockCmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent('sub-1', _)
+ 1 * mockMappersHandler.toNcmpOutEvent('sub-1', _)
and: 'correct number of calls to publish the ncmp out event to client'
- 1 * mockCmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
+ 1 * mockEventsHandler.publishNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
where: 'the following parameters are used'
- scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
- 'Accepted Status' | CmNotificationSubscriptionStatus.ACCEPTED | '1' || 1 | 1
- 'Rejected Status' | CmNotificationSubscriptionStatus.REJECTED | '104' || 1 | 0
+ scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
+ 'Accepted Status' | ACCEPTED | '1' || 1 | 1
+ 'Rejected Status' | REJECTED | '104' || 1 | 0
}
def getLoggingEvent() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparatorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparatorSpec.groovy
new file mode 100644
index 0000000000..0ebf9a6aed
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionComparatorSpec.groovy
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+
+
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
+
+class CmSubscriptionComparatorSpec extends Specification {
+
+ def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService)
+ def objectUnderTest = new CmSubscriptionComparator(mockCmSubscriptionPersistenceService)
+
+ def 'Find Delta of given list of predicates'() {
+ given: 'A list of predicates'
+ def predicates = [new DmiCmSubscriptionPredicate(['ch-1', 'ch-2'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/', 'b/2'].toSet())]
+ and: '3 positive responses and 1 negative.'
+ mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
+ mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'b/2') >>> true
+ mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'a/1/') >>> true
+ mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-2', 'b/2') >>> false
+ when: 'getDelta is called'
+ def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
+ then: 'verify correct delta is returned'
+ assert result.size() == 1
+ assert result[0].targetCmHandleIds[0] == 'ch-2'
+ assert result[0].xpaths[0] == 'b/2'
+
+ }
+
+ def 'Find Delta of given list of predicates when it is an ongoing Cm Subscription'() {
+ given: 'A list of predicates'
+ def predicates = [new DmiCmSubscriptionPredicate(['ch-1'].toSet(), PASSTHROUGH_OPERATIONAL, ['a/1/'].toSet())]
+ and: 'its already present'
+ mockCmSubscriptionPersistenceService.isOngoingCmSubscription(PASSTHROUGH_OPERATIONAL, 'ch-1', 'a/1/') >>> true
+ when: 'getDelta is called'
+ def result = objectUnderTest.getNewDmiSubscriptionPredicates(predicates)
+ then: 'verify correct delta is returned'
+ assert result.size() == 0
+ }
+
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
new file mode 100644
index 0000000000..caefdeea1d
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/CmSubscriptionHandlerImplSpec.groovy
@@ -0,0 +1,145 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.utils.CmSubscriptionPersistenceService
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING
+
+class CmSubscriptionHandlerImplSpec extends Specification {
+
+ def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+ def mockCmSubscriptionPersistenceService = Mock(CmSubscriptionPersistenceService);
+ def mockCmSubscriptionComparator = Mock(CmSubscriptionComparator);
+ def mockMappersFacade = Mock(MappersFacade);
+ def mockEventsFacade = Mock(EventsFacade);
+ def mockDmiCacheHandler = Mock(DmiCacheHandler);
+
+ def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService,
+ mockCmSubscriptionComparator, mockMappersFacade,
+ mockEventsFacade, mockDmiCacheHandler)
+
+ def testDmiSubscriptionsPerDmi = ["dmi-1": new DmiCmSubscriptionDetails([], PENDING)]
+
+ def 'Consume valid and unique CmNotificationSubscriptionNcmpInEvent create message'() {
+ given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
+ def testListOfDeltaPredicates = [new DmiCmSubscriptionPredicate(['ch1'].toSet(), PASSTHROUGH_OPERATIONAL, ['/a/b'].toSet())]
+ mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+ and: 'relevant details is extracted from the event'
+ def subscriptionId = testEventConsumed.getData().getSubscriptionId()
+ def predicates = testEventConsumed.getData().getPredicates()
+ and: 'the cache handler returns for relevant subscription id'
+ 1 * mockDmiCacheHandler.get("test-id") >> testDmiSubscriptionsPerDmi
+ and: 'the delta predicates is returned'
+ 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> testListOfDeltaPredicates
+ and: 'the DMI in event mapper returns cm notification subscription event'
+ def testDmiInEvent = new DmiInEvent()
+ 1 * mockMappersFacade
+ .toDmiInEvent(testListOfDeltaPredicates) >> testDmiInEvent
+ when: 'the valid and unique event is consumed'
+ objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
+ then: 'the subscription cache handler is called once'
+ 1 * mockDmiCacheHandler.add('test-id', _)
+ and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
+ testDmiSubscriptionsPerDmi.size() * mockEventsFacade.publishDmiInEvent(
+ "test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent)
+ and: 'we schedule to send the response after configured time from the cache'
+ 1 * mockEventsFacade.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+ }
+
+ def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() {
+ given: 'a cmNotificationSubscriptionNcmp in event with unique subscription id'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
+ def noDeltaPredicates = []
+ mockCmSubscriptionPersistenceService.isUniqueSubscriptionId("test-id") >> true
+ and: 'the cache handler returns for relevant subscription id'
+ 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
+ and: 'the delta predicates is returned'
+ 1 * mockCmSubscriptionComparator.getNewDmiSubscriptionPredicates(_) >> noDeltaPredicates
+ when: 'the valid and unique event is consumed'
+ objectUnderTest.processSubscriptionCreateRequest('test-id', noDeltaPredicates)
+ then: 'the subscription cache handler is called once'
+ 1 * mockDmiCacheHandler.add('test-id', _)
+ and: 'the subscription details are updated in the cache'
+ 1 * mockDmiCacheHandler.updateDmiSubscriptionStatusPerDmi('test-id', _, ACCEPTED)
+ and: 'we schedule to send the response after configured time from the cache'
+ 1 * mockEventsFacade.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+ }
+
+ def 'Consume valid and but non-unique CmNotificationSubscription create message'() {
+ given: 'a cmNotificationSubscriptionNcmp in event'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
+ mockCmSubscriptionPersistenceService.isUniqueSubscriptionId('test-id') >> false
+ and: 'relevant details is extracted from the event'
+ def subscriptionId = testEventConsumed.getData().getSubscriptionId()
+ def predicates = testEventConsumed.getData().getPredicates()
+ and: 'the NCMP out in event mapper returns an event for rejected request'
+ def testNcmpOutEvent = new NcmpOutEvent()
+ 1 * mockMappersFacade.toNcmpOutEventForRejectedRequest(
+ "test-id", _) >> testNcmpOutEvent
+ when: 'the valid but non-unique event is consumed'
+ objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
+ then: 'the events handler method to publish DMI event is never called'
+ 0 * mockEventsFacade.publishDmiInEvent(_, _, _, _)
+ and: 'the events handler method to publish NCMP out event is called once'
+ 1 * mockEventsFacade.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
+ }
+
+ def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
+ given: 'a cmNotificationSubscriptionNcmp in event for delete'
+ def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
+ def testEventConsumed = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
+ and: 'relevant details is extracted from the event'
+ def subscriptionId = testEventConsumed.getData().getSubscriptionId()
+ def predicates = testEventConsumed.getData().getPredicates()
+ and: 'the cache handler returns for relevant subscription id'
+ 1 * mockDmiCacheHandler.get('test-id') >> testDmiSubscriptionsPerDmi
+ when: 'the valid and unique event is consumed'
+ objectUnderTest.processSubscriptionDeleteRequest(subscriptionId, predicates)
+ then: 'the subscription cache handler is called once'
+ 1 * mockDmiCacheHandler.add('test-id', predicates)
+ and: 'the mapper handler to get DMI in event is called once'
+ 1 * mockMappersFacade.toDmiInEvent(_)
+ and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
+ testDmiSubscriptionsPerDmi.size() * mockEventsFacade.publishDmiInEvent(
+ 'test-id', 'dmi-1', 'subscriptionDeleteRequest', _)
+ and: 'we schedule to send the response after configured time from the cache'
+ 1 * mockEventsFacade.publishNcmpOutEvent('test-id', 'subscriptionDeleteResponse', null, true)
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
index 01a92c02fa..637e9ebfec 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionNcmpInEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpInEventConsumerSpec.groovy
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
@@ -28,10 +28,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer.CmNotificationSubscriptionNcmpInEventConsumer
-import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionHandlerService
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
@@ -39,10 +37,10 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class CmNotificationSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpec {
+class NcmpInEventConsumerSpec extends MessagingBaseSpec {
- def mockCmNotificationSubscriptionHandlerService = Mock(CmNotificationSubscriptionHandlerService)
- def objectUnderTest = new CmNotificationSubscriptionNcmpInEventConsumer(mockCmNotificationSubscriptionHandlerService)
+ def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler)
+ def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
def logger = Spy(ListAppender<ILoggingEvent>)
@Autowired
@@ -52,19 +50,19 @@ class CmNotificationSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpe
ObjectMapper objectMapper
void setup() {
- ((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionNcmpInEventConsumer.class)).addAppender(logger)
+ ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).addAppender(logger)
logger.start()
}
void cleanup() {
- ((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionNcmpInEventConsumer.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders()
}
def 'Consume valid CmNotificationSubscriptionNcmpInEvent create message'() {
given: 'a cmNotificationSubscription event'
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
.withId('subscriptionCreated')
@@ -80,13 +78,13 @@ class CmNotificationSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpe
and: 'the log indicates the task completed successfully'
assert loggingEvent.formattedMessage == 'Subscription create request for source some-resource with subscription id test-id ...'
and: 'the subscription handler service is called once'
- 1 * mockCmNotificationSubscriptionHandlerService.processSubscriptionCreateRequest('test-id',_)
+ 1 * mockCmSubscriptionHandler.processSubscriptionCreateRequest('test-id',_)
}
def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
given: 'a cmNotificationSubscription event'
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
+ def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent.class)
def testCloudEventSent = CloudEventBuilder.v1()
.withData(objectMapper.writeValueAsBytes(testEventSent))
.withId('sub-id')
@@ -102,7 +100,7 @@ class CmNotificationSubscriptionNcmpInEventConsumerSpec extends MessagingBaseSpe
and: 'the log indicates the task completed successfully'
assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...'
and: 'the subscription handler service is called once'
- 1 * mockCmNotificationSubscriptionHandlerService.processSubscriptionDeleteRequest('test-id',_)
+ 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id',_)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapperSpec.groovy
new file mode 100644
index 0000000000..2251a33466
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventMapperSpec.groovy
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+
+
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionPredicate
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.PENDING
+import static org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus.REJECTED
+
+class NcmpOutEventMapperSpec extends Specification {
+
+ static Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi
+
+ def objectUnderTest = new NcmpOutEventMapper()
+
+ def setup() {
+ def dmiSubscriptionPredicateA = new DmiCmSubscriptionPredicate(['ch-A'] as Set, PASSTHROUGH_RUNNING, ['/a'] as Set)
+ def dmiSubscriptionPredicateB = new DmiCmSubscriptionPredicate(['ch-B'] as Set, PASSTHROUGH_OPERATIONAL, ['/b'] as Set)
+ def dmiSubscriptionPredicateC = new DmiCmSubscriptionPredicate(['ch-C'] as Set, PASSTHROUGH_OPERATIONAL, ['/c'] as Set)
+ dmiSubscriptionsPerDmi = ['dmi-1': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateA], PENDING),
+ 'dmi-2': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateB], ACCEPTED),
+ 'dmi-3': new DmiCmSubscriptionDetails([dmiSubscriptionPredicateC], REJECTED)
+ ]
+ }
+
+ def 'Check for Cm Notification Subscription Outgoing event mapping'() {
+ when: 'we try to map the event to send it to client'
+ def result = objectUnderTest.toNcmpOutEvent('test-subscription', dmiSubscriptionsPerDmi)
+ then: 'event is mapped correctly for the subscription'
+ result.data.subscriptionId == 'test-subscription'
+ and: 'the cm handle ids are part of correct list'
+ result.data.pendingTargets == ['ch-A']
+ result.data.acceptedTargets == ['ch-B']
+ result.data.rejectedTargets == ['ch-C']
+ }
+
+ def 'Check for Cm Notification Rejected Subscription Outgoing event mapping'() {
+ when: 'we try to map the event to send it to client'
+ def result = objectUnderTest.toNcmpOutEventForRejectedRequest('test-subscription', ['ch-1', 'ch-2'])
+ then: 'event is mapped correctly for the subscription id'
+ result.data.subscriptionId == 'test-subscription'
+ and: 'the cm handle ids are part of correct list'
+ result.data.withRejectedTargets(['ch-1', 'ch-2'])
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy
new file mode 100644
index 0000000000..f96f3df786
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/ncmp/NcmpOutEventProducerSpec.groovy
@@ -0,0 +1,89 @@
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade
+import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent
+import org.onap.cps.ncmp.utils.events.CloudEventMapper
+import org.onap.cps.utils.JsonObjectMapper
+import spock.lang.Specification
+
+class NcmpOutEventProducerSpec extends Specification {
+
+ def mockEventsPublisher = Mock(EventsPublisher)
+ def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+ def mockCmNotificationSubscriptionMappersHandler = Mock(MappersFacade)
+ def mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCacheHandler)
+
+ def objectUnderTest = new NcmpOutEventProducer(mockEventsPublisher, jsonObjectMapper,
+ mockCmNotificationSubscriptionMappersHandler, mockDmiCmNotificationSubscriptionCacheHandler)
+
+ def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
+ given: 'a cm subscription response for the client'
+ def subscriptionId = 'test-subscription-id-2'
+ def eventType = 'subscriptionCreateResponse'
+ def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
+ and: 'also we have target topic for publishing to client'
+ objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
+ and: 'a deadline to an event'
+ objectUnderTest.dmiOutEventTimeoutInMs = 1000
+ when: 'the event is published'
+ objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
+ then: 'we conditionally wait for a while'
+ Thread.sleep(delayInMs)
+ then: 'the event contains the required attributes'
+ 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+ args ->
+ {
+ assert args[0] == 'client-test-topic'
+ assert args[1] == subscriptionId
+ def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent)
+ assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
+ assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
+ assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP'
+ assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent
+ }
+ }
+ where: 'following scenarios are considered'
+ scenario | delayInMs | eventPublishingTaskToBeScheduled
+ 'publish event now' | 0 | false
+ 'schedule and publish after the configured time ' | 1500 | true
+ }
+
+ def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() {
+ given: 'a cm subscription response for the client'
+ def subscriptionId = 'test-subscription-id-3'
+ def eventType = 'subscriptionCreateResponse'
+ def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
+ and: 'also we have target topic for publishing to client'
+ objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
+ and: 'a deadline to an event'
+ objectUnderTest.dmiOutEventTimeoutInMs = 1000
+ when: 'the event is scheduled to be published'
+ objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+ then: 'we wait for 10ms and then we receive response from DMI'
+ Thread.sleep(10)
+ and: 'we receive response from DMI so we publish the message on demand'
+ objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+ then: 'the event contains the required attributes'
+ 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+ args ->
+ {
+ assert args[0] == 'client-test-topic'
+ assert args[1] == subscriptionId
+ def ncmpOutEventAsCloudEvent = (args[2] as CloudEvent)
+ assert ncmpOutEventAsCloudEvent.getExtension('correlationid') == subscriptionId
+ assert ncmpOutEventAsCloudEvent.type == 'subscriptionCreateResponse'
+ assert ncmpOutEventAsCloudEvent.source.toString() == 'NCMP'
+ assert CloudEventMapper.toTargetEvent(ncmpOutEventAsCloudEvent, NcmpOutEvent) == ncmpOutEvent
+ }
+ }
+ then: 'the cache handler is called once to remove accepted and rejected entries in cache'
+ 1 * mockDmiCmNotificationSubscriptionCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId)
+ }
+
+
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
index ef735fd82a..d32d143ade 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/utils/CmSubscriptionPersistenceServiceSpec.groovy
@@ -19,38 +19,40 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.api.impl.events.cmsubscription.service
+package org.onap.cps.ncmp.impl.cmnotificationsubscription.utils
import com.fasterxml.jackson.databind.ObjectMapper
import org.onap.cps.api.CpsDataService
import org.onap.cps.api.CpsQueryService
-import org.onap.cps.ncmp.api.data.models.DatastoreType
-import org.onap.cps.spi.FetchDescendantsOption
import org.onap.cps.spi.model.DataNode
import org.onap.cps.utils.ContentType
import org.onap.cps.utils.JsonObjectMapper
import spock.lang.Specification
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceServiceImpl.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceServiceImpl.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH
-import static org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceServiceImpl.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID
+import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE
+import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH
+import static CmSubscriptionPersistenceService.CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_OPERATIONAL
+import static org.onap.cps.ncmp.api.data.models.DatastoreType.PASSTHROUGH_RUNNING
+import static org.onap.cps.spi.FetchDescendantsOption.DIRECT_CHILDREN_ONLY
+import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS
-class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification {
+class CmSubscriptionPersistenceServiceSpec extends Specification {
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockCpsQueryService = Mock(CpsQueryService)
def mockCpsDataService = Mock(CpsDataService)
- def objectUnderTest = new CmNotificationSubscriptionPersistenceServiceImpl(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
+ def objectUnderTest = new CmSubscriptionPersistenceService(jsonObjectMapper, mockCpsQueryService, mockCpsDataService)
def 'Check ongoing cm subscription #scenario'() {
given: 'a valid cm subscription query'
- def cpsPathQuery = "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='/cps/path']";
+ def cpsPathQuery = "/datastores/datastore[@name='ncmp-datastore:passthrough-running']/cm-handles/cm-handle[@id='ch-1']/filters/filter[@xpath='/cps/path']"
and: 'datanodes optionally returned'
1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> dataNode
+ cpsPathQuery, OMIT_DESCENDANTS) >> dataNode
when: 'we check for an ongoing cm subscription'
- def response = objectUnderTest.isOngoingCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/cps/path')
+ def response = objectUnderTest.isOngoingCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/cps/path')
then: 'we get expected response'
assert response == isOngoingCmSubscription
where: 'following scenarios are used'
@@ -63,7 +65,7 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
given: 'a cps path with a subscription ID for querying'
def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_ID.formatted('some-sub')
and: 'relevant datanodes are returned'
- 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >>
+ 1 * mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', cpsPathQuery, OMIT_DESCENDANTS) >>
dataNodes
when: 'a subscription ID is tested for uniqueness'
def result = objectUnderTest.isUniqueSubscriptionId('some-sub')
@@ -77,18 +79,18 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
def 'Add new subscriber to an ongoing cm notification subscription'() {
given: 'a valid cm subscription path query'
- def cpsPathQuery =CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
+ def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
and: 'a dataNode exists for the given cps path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])]
+ cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId')
+ objectUnderTest.addCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'newSubId')
then: 'data service method to update list of subscribers is called once'
1 * mockCpsDataService.updateNodeLeaves(
'NCMP-Admin',
'cm-data-subscriptions',
'/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-1','newSubId']), _,ContentType.JSON)
+ objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-1', 'newSubId']), _, ContentType.JSON)
}
def 'Add new cm notification subscription for #datastoreType'() {
@@ -100,25 +102,25 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
and: 'a datanode does not exist for cm subscription path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
cmSubscriptionCpsPathQuery,
- FetchDescendantsOption.OMIT_DESCENDANTS) >> []
+ OMIT_DESCENDANTS) >> []
and: 'a datanode does not exist for the given cm handle subscription path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmHandleForSubscriptionPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> []
+ cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> []
and: 'subscription is mapped as JSON'
def subscriptionAsJson = '{"cm-handle":[{"id":"ch-1","filters":' +
objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']) + '}]}'
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
+ objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
1 * mockCpsDataService.saveData(
'NCMP-Admin',
'cm-data-subscriptions',
parentNodeXpath.formatted(datastoreName),
- subscriptionAsJson,_, ContentType.JSON)
+ subscriptionAsJson, _, ContentType.JSON)
where:
- scenario | datastoreType || datastoreName
- 'passthrough_running' | DatastoreType.PASSTHROUGH_RUNNING || "ncmp-datastore:passthrough-running"
- 'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL || "ncmp-datastore:passthrough-operational"
+ scenario | datastoreType || datastoreName
+ 'passthrough_running' | PASSTHROUGH_RUNNING || 'ncmp-datastore:passthrough-running'
+ 'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
}
def 'Add new cm notification subscription when xpath does not exist for existing subscription cm handle'() {
@@ -129,52 +131,49 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
def parentNodeXpath = '/datastores/datastore[@name=\'%s\']/cm-handles/cm-handle[@id=\'%s\']/filters'
and: 'a datanode does not exist for cm subscription path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmSubscriptionCpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> []
+ cmSubscriptionCpsPathQuery, OMIT_DESCENDANTS) >> []
and: 'a datanode exists for the given cm handle subscription path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cmHandleForSubscriptionPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode()]
+ cmHandleForSubscriptionPathQuery, OMIT_DESCENDANTS) >> [new DataNode()]
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
+ objectUnderTest.addCmSubscription(datastoreType, 'ch-1', '/x/y', 'newSubId')
then: 'data service method to create new subscription for given subscriber is called once with the correct parameters'
1 * mockCpsDataService.saveListElements(
'NCMP-Admin',
'cm-data-subscriptions',
parentNodeXpath.formatted(datastoreName, 'ch-1'),
- objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']),_)
+ objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['newSubId']), _)
where:
- scenario | datastoreType || datastoreName
- 'passthrough_running' | DatastoreType.PASSTHROUGH_RUNNING || "ncmp-datastore:passthrough-running"
- 'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL || "ncmp-datastore:passthrough-operational"
+ scenario | datastoreType || datastoreName
+ 'passthrough_running' | PASSTHROUGH_RUNNING || 'ncmp-datastore:passthrough-running'
+ 'passthrough_operational' | PASSTHROUGH_OPERATIONAL || 'ncmp-datastore:passthrough-operational'
}
def 'Remove subscriber from a list of an ongoing cm notification subscription'() {
given: 'a subscription exists when queried'
def cpsPathQuery = CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
- cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1', 'sub-2']])]
+ cpsPathQuery, OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1', 'sub-2']])]
when: 'the subscriber is removed'
- objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+ objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
then: 'the list of subscribers is updated'
1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions',
'/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
objectUnderTest.getSubscriptionDetailsAsJson('/x/y', ['sub-2']), _, ContentType.JSON)
}
- def 'Removing last ongoing subscription for datastore and cmhandle and xpath'(){
+ def 'Removing last ongoing subscription for datastore and cmhandle and xpath'() {
given: 'a subscription exists when queried but has only 1 subscriber'
mockCpsQueryService.queryDataNodes(
- 'NCMP-Admin',
- 'cm-data-subscriptions',
+ 'NCMP-Admin', 'cm-data-subscriptions',
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_WITH_DATASTORE_CMHANDLE_AND_XPATH.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y'),
- FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])]
+ OMIT_DESCENDANTS) >> [new DataNode(leaves: ['xpath': '/x/y', 'subscriptionIds': ['sub-1']])]
and: 'the #scenario'
- mockCpsQueryService.queryDataNodes(
- 'NCMP-Admin',
- 'cm-data-subscriptions',
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
CPS_PATH_QUERY_FOR_CM_SUBSCRIPTION_FILTERS_WITH_DATASTORE_AND_CMHANDLE.formatted('ncmp-datastore:passthrough-running', 'ch-1'),
- FetchDescendantsOption.DIRECT_CHILDREN_ONLY) >> [new DataNode(childDataNodes: listOfChildNodes)]
+ DIRECT_CHILDREN_ONLY) >> [new DataNode(childDataNodes: listOfChildNodes)]
when: 'that last ongoing subscription is removed'
- objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+ objectUnderTest.removeCmSubscription(PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
then: 'the subscription with empty subscriber list is removed'
1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
'/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']',
@@ -184,9 +183,9 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
'/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']',
_)
where:
- scenario | listOfChildNodes || numberOfCallsToDeleteCmHandle
- 'cm handle in same datastore is used for other subscriptions' | [new DataNode()] || 0
- 'cm handle in same datastore is NOT used for other subscriptions' | [] || 1
+ scenario | listOfChildNodes || numberOfCallsToDeleteCmHandle
+ 'cm handle in same datastore is used for other subscriptions' | [new DataNode()] || 0
+ 'cm handle in same datastore is NOT used for other subscriptions' | [] || 1
}
}