aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java66
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java51
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java51
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java82
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java44
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java37
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java8
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy75
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy79
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy19
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy38
12 files changed, 478 insertions, 76 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
index 0b67266375..ff7afc9eb7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
@@ -20,17 +20,13 @@
package org.onap.cps.ncmp.api.impl.config.embeddedcache;
-import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.NamedConfig;
import com.hazelcast.config.QueueConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.cache.HazelcastCacheConfig;
import org.onap.cps.spi.model.DataNode;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -39,18 +35,12 @@ import org.springframework.context.annotation.Configuration;
*/
@Slf4j
@Configuration
-public class SynchronizationCacheConfig {
+public class SynchronizationCacheConfig extends HazelcastCacheConfig {
public static final int MODULE_SYNC_STARTED_TTL_SECS = 600;
public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800;
- @Value("${hazelcast.mode.kubernetes.enabled}")
- private boolean cacheKubernetesEnabled;
-
- @Value("${hazelcast.mode.kubernetes.service-name}")
- private String cacheKubernetesServiceName;
-
- private static final QueueConfig commonQueueConfig = createQueueConfig();
+ private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig");
private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig");
private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig");
@@ -61,7 +51,8 @@ public class SynchronizationCacheConfig {
*/
@Bean
public BlockingQueue<DataNode> moduleSyncWorkQueue() {
- return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
+ return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig,
+ "synchronization-caches")
.getQueue("moduleSyncWorkQueue");
}
@@ -72,7 +63,8 @@ public class SynchronizationCacheConfig {
*/
@Bean
public IMap<String, Object> moduleSyncStartedOnCmHandles() {
- return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
+ return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig,
+ "synchronization-caches")
.getMap("moduleSyncStartedOnCmHandles");
}
@@ -83,48 +75,8 @@ public class SynchronizationCacheConfig {
*/
@Bean
public IMap<String, Boolean> dataSyncSemaphores() {
- return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
+ return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig,
+ "synchronization-caches")
.getMap("dataSyncSemaphores");
}
-
- private HazelcastInstance createHazelcastInstance(
- final String hazelcastInstanceName, final NamedConfig namedConfig) {
- return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
- }
-
- private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
- final Config config = new Config(instanceName);
- if (namedConfig instanceof MapConfig) {
- config.addMapConfig((MapConfig) namedConfig);
- }
- if (namedConfig instanceof QueueConfig) {
- config.addQueueConfig((QueueConfig) namedConfig);
- }
- config.setClusterName("synchronization-caches");
- updateDiscoveryMode(config);
- return config;
- }
-
- private static QueueConfig createQueueConfig() {
- final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
- commonQueueConfig.setBackupCount(3);
- commonQueueConfig.setAsyncBackupCount(3);
- return commonQueueConfig;
- }
-
- private static MapConfig createMapConfig(final String configName) {
- final MapConfig mapConfig = new MapConfig(configName);
- mapConfig.setBackupCount(3);
- mapConfig.setAsyncBackupCount(3);
- return mapConfig;
- }
-
- private void updateDiscoveryMode(final Config config) {
- if (cacheKubernetesEnabled) {
- log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName);
- config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
- .setProperty("service-name", cacheKubernetesServiceName);
- }
- }
-
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
new file mode 100644
index 0000000000..d2c3dc2599
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import org.onap.cps.cache.HazelcastCacheConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases.
+ */
+@Configuration
+public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig {
+
+ private static final MapConfig forwardedSubscriptionEventCacheMapConfig =
+ createMapConfig("forwardedSubscriptionEventCacheMapConfig");
+
+ /**
+ * Distributed instance of forwarded subscription information cache that contains subscription event
+ * id by dmi names as properties.
+ *
+ * @return configured map of subscription event ids as keys to sets of dmi names for values
+ */
+ @Bean
+ public IMap<String, Set<String>> forwardedSubscriptionEventCache() {
+ return createHazelcastInstance("hazelCastInstanceSubscriptionEvents",
+ forwardedSubscriptionEventCacheMapConfig, "cps-ncmp-service-caches")
+ .getMap("forwardedSubscriptionEventCache");
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
new file mode 100644
index 0000000000..e7edecfacc
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RequiredArgsConstructor
+public class ResponseTimeoutTask implements Runnable {
+
+ private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+ private final String subscriptionEventId;
+
+ @Override
+ public void run() {
+ if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
+ final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
+ if (dmiNames.isEmpty()) {
+ //TODO full outcome response here
+ log.info("placeholder to create full outcome response for subscriptionEventId: {}.",
+ subscriptionEventId);
+ } else {
+ //TODO partial outcome response here
+ log.info("placeholder to create partial outcome response for subscriptionEventId: {}.",
+ subscriptionEventId);
+ }
+ forwardedSubscriptionEventCache.remove(subscriptionEventId);
+ }
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
new file mode 100644
index 0000000000..b332ad1a0e
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import com.hazelcast.map.IMap;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class SubscriptionEventResponseConsumer {
+
+ private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+
+ @Value("${app.ncmp.avc.subscription-outcome-topic}")
+ private String subscriptionOutcomeEventTopic;
+
+ @Value("${notification.enabled:true}")
+ private boolean notificationFeatureEnabled;
+
+ @Value("${ncmp.model-loader.subscription:false}")
+ private boolean subscriptionModelLoaderEnabled;
+
+ /**
+ * Consume subscription response event.
+ *
+ * @param subscriptionEventResponse the event to be consumed
+ */
+ @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
+ public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) {
+ log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId());
+ final String subscriptionEventId = subscriptionEventResponse.getClientId()
+ + subscriptionEventResponse.getSubscriptionName();
+ final boolean createOutcomeResponse;
+ if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
+ forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName());
+ createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
+ if (createOutcomeResponse) {
+ forwardedSubscriptionEventCache.remove(subscriptionEventId);
+ }
+ } else {
+ createOutcomeResponse = true;
+ }
+ if (subscriptionModelLoaderEnabled) {
+ updateSubscriptionEvent(subscriptionEventResponse);
+ }
+ if (createOutcomeResponse && notificationFeatureEnabled) {
+ log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId);
+ //TODO Create outcome response
+ }
+ }
+
+ private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
+ log.info("placeholder to update persisted subscription for subscriptionEventId: {}.",
+ subscriptionEventResponse.getClientId() + subscriptionEventResponse.getSubscriptionName());
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 1fb72a59d3..4afa051d30 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -20,20 +20,28 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.event.model.SubscriptionEvent;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -44,8 +52,15 @@ public class SubscriptionEventForwarder {
private final InventoryPersistence inventoryPersistence;
private final EventsPublisher<SubscriptionEvent> eventsPublisher;
+ private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
+
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-";
+ @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}")
+ private int dmiResponseTimeoutInMs;
+
/**
* Forward subscription event.
*
@@ -65,15 +80,32 @@ public class SubscriptionEventForwarder {
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
- dmiPropertiesPerCmHandleIdPerServiceName.forEach((dmiServiceName, cmHandlePropertiesMap) -> {
- subscriptionEvent.getEvent().getPredicates().setTargets(Collections
- .singletonList(cmHandlePropertiesMap));
- final String eventKey = createEventKey(subscriptionEvent, dmiServiceName);
- eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiServiceName, eventKey,
- subscriptionEvent);
+
+ final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
+ startResponseTimeout(subscriptionEvent, dmisToRespond);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent);
+ }
+
+ private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
+ final SubscriptionEvent subscriptionEvent) {
+ dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
+ subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap));
+ final String eventKey = createEventKey(subscriptionEvent, dmiName);
+ eventsPublisher.publishEvent(
+ DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent);
});
}
+ private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
+ final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID()
+ + subscriptionEvent.getEvent().getSubscription().getName();
+
+ forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond);
+ final ResponseTimeoutTask responseTimeoutTask =
+ new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId);
+ executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
+ }
+
private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) {
return subscriptionEvent.getEvent().getSubscription().getClientID()
+ "-"
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java
new file mode 100644
index 0000000000..95e773c8c9
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Getter
+@Setter
+public class SubscriptionEventResponse {
+ private String clientId;
+ private String subscriptionName;
+ private String dmiName;
+ private Map<String, SubscriptionStatus> cmHandleIdToStatus;
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java
index 5a418fd312..659779acf9 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java
@@ -52,10 +52,10 @@ public class SubscriptionModelLoader implements ModelLoader {
private static final String SUBSCRIPTION_SCHEMASET_NAME = "subscriptions";
private static final String SUBSCRIPTION_REGISTRY_DATANODE_NAME = "subscription-registry";
- @Value("${ncmp.model-loader.maximumAttemptCount:20}")
+ @Value("${ncmp.model-loader.maximum-attempt-count:20}")
private int maximumAttemptCount;
- @Value("${ncmp.model-loader.retryTimeMs:1000}")
+ @Value("${ncmp.timers.model-loader.retry-time-ms:1000}")
private long retryTimeMs;
@Value("${ncmp.model-loader.subscription:false}")
@@ -99,7 +99,7 @@ public class SubscriptionModelLoader implements ModelLoader {
}
} else {
throw new NcmpStartUpException("Retrieval of NCMP dataspace fails",
- "NCMP dataspace does not exist");
+ "NCMP dataspace does not exist");
}
}
}
@@ -139,7 +139,7 @@ public class SubscriptionModelLoader implements ModelLoader {
*/
@Override
public boolean createAnchor(final String dataspaceName, final String schemaSetName,
- final String anchorName) {
+ final String anchorName) {
try {
cpsAdminService.createAnchor(dataspaceName, schemaSetName, anchorName);
} catch (final AlreadyDefinedException exception) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
index 6829d834d2..567debdded 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
@@ -51,8 +51,8 @@ class SynchronizationCacheConfigSpec extends Specification {
assert null != moduleSyncStartedOnCmHandles
and: 'system is able to create an instance of a map to hold data sync semaphores'
assert null != dataSyncSemaphores
- and: 'there 3 instances'
- assert Hazelcast.allHazelcastInstances.size() == 3
+ and: 'there are at least 3 instances'
+ assert Hazelcast.allHazelcastInstances.size() > 2
and: 'they have the correct names (in any order)'
assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' )
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy
new file mode 100644
index 0000000000..7448daf598
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc
+
+import com.hazelcast.config.Config
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.map.IMap
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ForwardedSubscriptionEventCacheConfig])
+class ForwardedSubscriptionEventCacheConfigSpec extends Specification {
+
+ @Autowired
+ private IMap<String, Set<String>> forwardedSubscriptionEventCache
+
+ def 'Embedded (hazelcast) cache for Forwarded Subscription Event Cache.'() {
+ expect: 'system is able to create an instance of the Forwarded Subscription Event Cache'
+ assert null != forwardedSubscriptionEventCache
+ and: 'there is at least 1 instance'
+ assert Hazelcast.allHazelcastInstances.size() > 0
+ and: 'Forwarded Subscription Event Cache is present'
+ assert Hazelcast.allHazelcastInstances.name.contains('hazelCastInstanceSubscriptionEvents')
+ }
+
+ def 'Verify configs for Distributed Caches'(){
+ given: 'the Forwarded Subscription Event Cache config'
+ def forwardedSubscriptionEventCacheConfig = Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.mapConfigs.get('forwardedSubscriptionEventCacheMapConfig')
+ expect: 'system created instance with correct config'
+ assert forwardedSubscriptionEventCacheConfig.backupCount == 3
+ assert forwardedSubscriptionEventCacheConfig.asyncBackupCount == 3
+ }
+
+ def 'Verify deployment network configs for Distributed Caches'() {
+ given: 'the Forwarded Subscription Event Cache config'
+ def forwardedSubscriptionEventCacheNetworkConfig = Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.networkConfig
+ expect: 'system created instance with correct config'
+ assert forwardedSubscriptionEventCacheNetworkConfig.join.autoDetectionConfig.enabled
+ assert !forwardedSubscriptionEventCacheNetworkConfig.join.kubernetesConfig.enabled
+ }
+
+ def 'Verify network config'() {
+ given: 'Synchronization config object and test configuration'
+ def objectUnderTest = new ForwardedSubscriptionEventCacheConfig()
+ def testConfig = new Config()
+ when: 'kubernetes properties are enabled'
+ objectUnderTest.cacheKubernetesEnabled = true
+ objectUnderTest.cacheKubernetesServiceName = 'test-service-name'
+ and: 'method called to update the discovery mode'
+ objectUnderTest.updateDiscoveryMode(testConfig)
+ then: 'applied properties are reflected'
+ assert testConfig.networkConfig.join.kubernetesConfig.enabled
+ assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name'
+
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
new file mode 100644
index 0000000000..a673462008
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.map.IMap
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.boot.test.context.SpringBootTest
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
+
+ IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
+
+ def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache)
+
+
+ def 'Consume Subscription Event Response where all DMIs have responded'() {
+ given: 'a subscription event response with a clientId, subscriptionName and dmiName'
+ def testEventReceived = new SubscriptionEventResponse()
+ testEventReceived.clientId = 'some-client-id'
+ testEventReceived.subscriptionName = 'some-subscription-name'
+ testEventReceived.dmiName = 'some-dmi-name'
+ and: 'notifications are enabled'
+ objectUnderTest.notificationFeatureEnabled = true
+ and: 'subscription model loader is enabled'
+ objectUnderTest.subscriptionModelLoaderEnabled = true
+ when: 'the valid event is consumed'
+ objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+ then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
+ 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
+ 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set)
+ and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
+ 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set)
+ and: 'the subscription event is removed from the map'
+ 1 * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name')
+ }
+
+ def 'Consume Subscription Event Response where another DMI has not yet responded'() {
+ given: 'a subscription event response with a clientId, subscriptionName and dmiName'
+ def testEventReceived = new SubscriptionEventResponse()
+ testEventReceived.clientId = 'some-client-id'
+ testEventReceived.subscriptionName = 'some-subscription-name'
+ testEventReceived.dmiName = 'some-dmi-name'
+ and: 'notifications are enabled'
+ objectUnderTest.notificationFeatureEnabled = true
+ and: 'subscription model loader is enabled'
+ objectUnderTest.subscriptionModelLoaderEnabled = true
+ when: 'the valid event is consumed'
+ objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+ then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
+ 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
+ 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set)
+ and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed'
+ 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set)
+ and: 'the subscription event is not removed from the map'
+ 0 * mockForwardedSubscriptionEventCache.remove(_)
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy
index 3a7aa481c3..f2ff1f7b23 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy
@@ -48,7 +48,7 @@ class SubscriptionEventMapperSpec extends Specification {
def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
then: 'the resulting yang model subscription event contains the correct clientId'
assert result.clientId == "SCO-9989752"
- and: 'client name'
+ and: 'subscription name'
assert result.subscriptionName == "cm-subscription-001"
and: 'is tagged value is false'
assert !result.isTagged
@@ -60,4 +60,21 @@ class SubscriptionEventMapperSpec extends Specification {
assert result.topic == null
}
+ def 'Map null subscription event to yang model subscription event where #scenario'() {
+ given: 'a new Subscription Event with no data'
+ def testEventToMap = new SubscriptionEvent()
+ when: 'the event is mapped to a yang model subscription'
+ def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap)
+ then: 'the resulting yang model subscription event contains null clientId'
+ assert result.clientId == null
+ and: 'subscription name is null'
+ assert result.subscriptionName == null
+ and: 'is tagged value is false'
+ assert result.isTagged == false
+ and: 'predicates is null'
+ assert result.predicates == null
+ and: 'the topic is null'
+ assert result.topic == null
+ }
+
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
index 2b0adf3420..457eb6fa99 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
+import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
@@ -29,20 +30,28 @@ import org.onap.cps.ncmp.event.model.SubscriptionEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException
import org.onap.cps.utils.JsonObjectMapper
+import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
+import spock.util.concurrent.BlockingVariable
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
- def mockInventoryPersistence = Mock(InventoryPersistence)
- def mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
- def objectUnderTest = new SubscriptionEventForwarder(mockInventoryPersistence, mockSubscriptionEventPublisher)
+ @Autowired
+ SubscriptionEventForwarder objectUnderTest
+
+ @SpringBean
+ InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+ @SpringBean
+ EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
+ @SpringBean
+ IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
@Autowired
JsonObjectMapper jsonObjectMapper
- def 'Forward valid CM create subscription'() {
+ def 'Forward valid CM create subscription and simulate timeout where #scenario'() {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
@@ -52,9 +61,17 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
createYangModelCmHandleWithDmiProperty(2, 1,"shape","square"),
createYangModelCmHandleWithDmiProperty(3, 2,"shape","triangle")
]
+ and: 'the thread creation delay is reduced to 2 seconds for testing'
+ objectUnderTest.dmiResponseTimeoutInMs = 2000
+ and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
+ def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
- then: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
+ then: 'An asynchronous call is made to the blocking variable'
+ block.get()
+ then: 'the event is added to the forwarded subscription event cache'
+ 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
+ and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
subscriptionEvent -> {
Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
@@ -68,6 +85,15 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
targets["CMHandle3"] == ["shape":"triangle"]
}
)
+ and: 'a separate thread has been created where the map is polled'
+ 1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
+ 1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap)
+ and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
+ 1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
+ where:
+ scenario | DMINamesInMap
+ 'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set
+ 'all dmis have responded ' | [] as Set
}
def 'Forward CM create subscription where target CM Handles are #scenario'() {