diff options
15 files changed, 578 insertions, 127 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index b95d9eb43b..1eb1c11f20 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -99,6 +99,9 @@ app: topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} avc: subscription-topic: ${NCMP_CM_AVC_SUBSCRIPTION:cm-avc-subscription} + subscription-forward-topic: ${NCMP_FORWARD_CM_AVC_SUBSCRIPTION:ncmp-dmi-cm-avc-subscription} + subscription-response-topic: ${NCMP_RESPONSE_CM_AVC_SUBSCRIPTION:dmi-ncmp-cm-avc-subscription} + subscription-outcome-topic: ${NCMP_OUTCOME_CM_AVC_SUBSCRIPTION:cm-avc-subscription-response} cm-events-topic: ${NCMP_CM_EVENTS_TOPIC:cm-events} lcm: events: @@ -181,6 +184,10 @@ ncmp: sleep-time-ms: 300000 cm-handle-data-sync: sleep-time-ms: 30000 + subscription-forwarding: + dmi-response-timeout-ms: 30000 + model-loader: + retry-time-ms: 1000 modules-sync-watchdog: async-executor: @@ -188,12 +195,11 @@ ncmp: model-loader: subscription: false - maximumAttemptCount: 20 - retryTimeMs: 1000 + maximum-attempt-count: 20 # Custom Hazelcast Config. hazelcast: - mode: - kubernetes: - enabled: ${HAZELCAST_MODE_KUBERNETES_ENABLED:false} - service-name: ${CPS_NCMP_SERVICE_NAME:"cps-and-ncmp-service"}
\ No newline at end of file + mode: + kubernetes: + enabled: ${HAZELCAST_MODE_KUBERNETES_ENABLED:false} + service-name: ${CPS_NCMP_SERVICE_NAME:"cps-and-ncmp-service"}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java index 0b67266375..ff7afc9eb7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -20,17 +20,13 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache; -import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; -import com.hazelcast.config.NamedConfig; import com.hazelcast.config.QueueConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.cache.HazelcastCacheConfig; import org.onap.cps.spi.model.DataNode; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,18 +35,12 @@ import org.springframework.context.annotation.Configuration; */ @Slf4j @Configuration -public class SynchronizationCacheConfig { +public class SynchronizationCacheConfig extends HazelcastCacheConfig { public static final int MODULE_SYNC_STARTED_TTL_SECS = 600; public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800; - @Value("${hazelcast.mode.kubernetes.enabled}") - private boolean cacheKubernetesEnabled; - - @Value("${hazelcast.mode.kubernetes.service-name}") - private String cacheKubernetesServiceName; - - private static final QueueConfig commonQueueConfig = createQueueConfig(); + private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); @@ -61,7 +51,8 @@ public class SynchronizationCacheConfig { */ @Bean public BlockingQueue<DataNode> moduleSyncWorkQueue() { - return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig) + return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig, + "synchronization-caches") .getQueue("moduleSyncWorkQueue"); } @@ -72,7 +63,8 @@ public class SynchronizationCacheConfig { */ @Bean public IMap<String, Object> moduleSyncStartedOnCmHandles() { - return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) + return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig, + "synchronization-caches") .getMap("moduleSyncStartedOnCmHandles"); } @@ -83,48 +75,8 @@ public class SynchronizationCacheConfig { */ @Bean public IMap<String, Boolean> dataSyncSemaphores() { - return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) + return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig, + "synchronization-caches") .getMap("dataSyncSemaphores"); } - - private HazelcastInstance createHazelcastInstance( - final String hazelcastInstanceName, final NamedConfig namedConfig) { - return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); - } - - private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { - final Config config = new Config(instanceName); - if (namedConfig instanceof MapConfig) { - config.addMapConfig((MapConfig) namedConfig); - } - if (namedConfig instanceof QueueConfig) { - config.addQueueConfig((QueueConfig) namedConfig); - } - config.setClusterName("synchronization-caches"); - updateDiscoveryMode(config); - return config; - } - - private static QueueConfig createQueueConfig() { - final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig"); - commonQueueConfig.setBackupCount(3); - commonQueueConfig.setAsyncBackupCount(3); - return commonQueueConfig; - } - - private static MapConfig createMapConfig(final String configName) { - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - return mapConfig; - } - - private void updateDiscoveryMode(final Config config) { - if (cacheKubernetesEnabled) { - log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName); - config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) - .setProperty("service-name", cacheKubernetesServiceName); - } - } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java new file mode 100644 index 0000000000..d2c3dc2599 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.map.IMap; +import java.util.Set; +import org.onap.cps.cache.HazelcastCacheConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases. + */ +@Configuration +public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { + + private static final MapConfig forwardedSubscriptionEventCacheMapConfig = + createMapConfig("forwardedSubscriptionEventCacheMapConfig"); + + /** + * Distributed instance of forwarded subscription information cache that contains subscription event + * id by dmi names as properties. + * + * @return configured map of subscription event ids as keys to sets of dmi names for values + */ + @Bean + public IMap<String, Set<String>> forwardedSubscriptionEventCache() { + return createHazelcastInstance("hazelCastInstanceSubscriptionEvents", + forwardedSubscriptionEventCacheMapConfig, "cps-ncmp-service-caches") + .getMap("forwardedSubscriptionEventCache"); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java new file mode 100644 index 0000000000..e7edecfacc --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ResponseTimeoutTask implements Runnable { + + private final IMap<String, Set<String>> forwardedSubscriptionEventCache; + private final String subscriptionEventId; + + @Override + public void run() { + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + if (dmiNames.isEmpty()) { + //TODO full outcome response here + log.info("placeholder to create full outcome response for subscriptionEventId: {}.", + subscriptionEventId); + } else { + //TODO partial outcome response here + log.info("placeholder to create partial outcome response for subscriptionEventId: {}.", + subscriptionEventId); + } + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java new file mode 100644 index 0000000000..b332ad1a0e --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventResponseConsumer { + + private final IMap<String, Set<String>> forwardedSubscriptionEventCache; + + @Value("${app.ncmp.avc.subscription-outcome-topic}") + private String subscriptionOutcomeEventTopic; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume subscription response event. + * + * @param subscriptionEventResponse the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) + public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) { + log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId()); + final String subscriptionEventId = subscriptionEventResponse.getClientId() + + subscriptionEventResponse.getSubscriptionName(); + final boolean createOutcomeResponse; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName()); + createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + if (createOutcomeResponse) { + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } else { + createOutcomeResponse = true; + } + if (subscriptionModelLoaderEnabled) { + updateSubscriptionEvent(subscriptionEventResponse); + } + if (createOutcomeResponse && notificationFeatureEnabled) { + log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId); + //TODO Create outcome response + } + } + + private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { + log.info("placeholder to update persisted subscription for subscriptionEventId: {}.", + subscriptionEventResponse.getClientId() + subscriptionEventResponse.getSubscriptionName()); + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 1fb72a59d3..4afa051d30 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -20,20 +20,28 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; +import com.hazelcast.map.IMap; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.event.model.SubscriptionEvent; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -44,8 +52,15 @@ public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; private final EventsPublisher<SubscriptionEvent> eventsPublisher; + private final IMap<String, Set<String>> forwardedSubscriptionEventCache; + + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}") + private int dmiResponseTimeoutInMs; + /** * Forward subscription event. * @@ -65,15 +80,32 @@ public class SubscriptionEventForwarder { final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); - dmiPropertiesPerCmHandleIdPerServiceName.forEach((dmiServiceName, cmHandlePropertiesMap) -> { - subscriptionEvent.getEvent().getPredicates().setTargets(Collections - .singletonList(cmHandlePropertiesMap)); - final String eventKey = createEventKey(subscriptionEvent, dmiServiceName); - eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiServiceName, eventKey, - subscriptionEvent); + + final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); + startResponseTimeout(subscriptionEvent, dmisToRespond); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent); + } + + private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap, + final SubscriptionEvent subscriptionEvent) { + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); + final String eventKey = createEventKey(subscriptionEvent, dmiName); + eventsPublisher.publishEvent( + DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); }); } + private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) { + final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID() + + subscriptionEvent.getEvent().getSubscription().getName(); + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId); + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } + private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { return subscriptionEvent.getEvent().getSubscription().getClientID() + "-" diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java new file mode 100644 index 0000000000..95e773c8c9 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.models; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Getter +@Setter +public class SubscriptionEventResponse { + private String clientId; + private String subscriptionName; + private String dmiName; + private Map<String, SubscriptionStatus> cmHandleIdToStatus; +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java index 5a418fd312..659779acf9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java @@ -52,10 +52,10 @@ public class SubscriptionModelLoader implements ModelLoader { private static final String SUBSCRIPTION_SCHEMASET_NAME = "subscriptions"; private static final String SUBSCRIPTION_REGISTRY_DATANODE_NAME = "subscription-registry"; - @Value("${ncmp.model-loader.maximumAttemptCount:20}") + @Value("${ncmp.model-loader.maximum-attempt-count:20}") private int maximumAttemptCount; - @Value("${ncmp.model-loader.retryTimeMs:1000}") + @Value("${ncmp.timers.model-loader.retry-time-ms:1000}") private long retryTimeMs; @Value("${ncmp.model-loader.subscription:false}") @@ -99,7 +99,7 @@ public class SubscriptionModelLoader implements ModelLoader { } } else { throw new NcmpStartUpException("Retrieval of NCMP dataspace fails", - "NCMP dataspace does not exist"); + "NCMP dataspace does not exist"); } } } @@ -139,7 +139,7 @@ public class SubscriptionModelLoader implements ModelLoader { */ @Override public boolean createAnchor(final String dataspaceName, final String schemaSetName, - final String anchorName) { + final String anchorName) { try { cpsAdminService.createAnchor(dataspaceName, schemaSetName, anchorName); } catch (final AlreadyDefinedException exception) { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy index 6829d834d2..567debdded 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy @@ -51,8 +51,8 @@ class SynchronizationCacheConfigSpec extends Specification { assert null != moduleSyncStartedOnCmHandles and: 'system is able to create an instance of a map to hold data sync semaphores' assert null != dataSyncSemaphores - and: 'there 3 instances' - assert Hazelcast.allHazelcastInstances.size() == 3 + and: 'there are at least 3 instances' + assert Hazelcast.allHazelcastInstances.size() > 2 and: 'they have the correct names (in any order)' assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' ) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy new file mode 100644 index 0000000000..7448daf598 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfigSpec.groovy @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc + +import com.hazelcast.config.Config +import com.hazelcast.core.Hazelcast +import com.hazelcast.map.IMap +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification + +@SpringBootTest(classes = [ForwardedSubscriptionEventCacheConfig]) +class ForwardedSubscriptionEventCacheConfigSpec extends Specification { + + @Autowired + private IMap<String, Set<String>> forwardedSubscriptionEventCache + + def 'Embedded (hazelcast) cache for Forwarded Subscription Event Cache.'() { + expect: 'system is able to create an instance of the Forwarded Subscription Event Cache' + assert null != forwardedSubscriptionEventCache + and: 'there is at least 1 instance' + assert Hazelcast.allHazelcastInstances.size() > 0 + and: 'Forwarded Subscription Event Cache is present' + assert Hazelcast.allHazelcastInstances.name.contains('hazelCastInstanceSubscriptionEvents') + } + + def 'Verify configs for Distributed Caches'(){ + given: 'the Forwarded Subscription Event Cache config' + def forwardedSubscriptionEventCacheConfig = Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.mapConfigs.get('forwardedSubscriptionEventCacheMapConfig') + expect: 'system created instance with correct config' + assert forwardedSubscriptionEventCacheConfig.backupCount == 3 + assert forwardedSubscriptionEventCacheConfig.asyncBackupCount == 3 + } + + def 'Verify deployment network configs for Distributed Caches'() { + given: 'the Forwarded Subscription Event Cache config' + def forwardedSubscriptionEventCacheNetworkConfig = Hazelcast.getHazelcastInstanceByName('hazelCastInstanceSubscriptionEvents').config.networkConfig + expect: 'system created instance with correct config' + assert forwardedSubscriptionEventCacheNetworkConfig.join.autoDetectionConfig.enabled + assert !forwardedSubscriptionEventCacheNetworkConfig.join.kubernetesConfig.enabled + } + + def 'Verify network config'() { + given: 'Synchronization config object and test configuration' + def objectUnderTest = new ForwardedSubscriptionEventCacheConfig() + def testConfig = new Config() + when: 'kubernetes properties are enabled' + objectUnderTest.cacheKubernetesEnabled = true + objectUnderTest.cacheKubernetesServiceName = 'test-service-name' + and: 'method called to update the discovery mode' + objectUnderTest.updateDiscoveryMode(testConfig) + then: 'applied properties are reflected' + assert testConfig.networkConfig.join.kubernetesConfig.enabled + assert testConfig.networkConfig.join.kubernetesConfig.properties.get('service-name') == 'test-service-name' + + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy new file mode 100644 index 0000000000..a673462008 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc + +import com.fasterxml.jackson.databind.ObjectMapper +import com.hazelcast.map.IMap +import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.boot.test.context.SpringBootTest + +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) +class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec { + + IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>) + + def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache) + + + def 'Consume Subscription Event Response where all DMIs have responded'() { + given: 'a subscription event response with a clientId, subscriptionName and dmiName' + def testEventReceived = new SubscriptionEventResponse() + testEventReceived.clientId = 'some-client-id' + testEventReceived.subscriptionName = 'some-subscription-name' + testEventReceived.dmiName = 'some-dmi-name' + and: 'notifications are enabled' + objectUnderTest.notificationFeatureEnabled = true + and: 'subscription model loader is enabled' + objectUnderTest.subscriptionModelLoaderEnabled = true + when: 'the valid event is consumed' + objectUnderTest.consumeSubscriptionEventResponse(testEventReceived) + then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event' + 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true + 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set) + and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed' + 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set) + and: 'the subscription event is removed from the map' + 1 * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name') + } + + def 'Consume Subscription Event Response where another DMI has not yet responded'() { + given: 'a subscription event response with a clientId, subscriptionName and dmiName' + def testEventReceived = new SubscriptionEventResponse() + testEventReceived.clientId = 'some-client-id' + testEventReceived.subscriptionName = 'some-subscription-name' + testEventReceived.dmiName = 'some-dmi-name' + and: 'notifications are enabled' + objectUnderTest.notificationFeatureEnabled = true + and: 'subscription model loader is enabled' + objectUnderTest.subscriptionModelLoaderEnabled = true + when: 'the valid event is consumed' + objectUnderTest.consumeSubscriptionEventResponse(testEventReceived) + then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event' + 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true + 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set) + and: 'the forwarded subscription event cache returns an empty Map when the dmiName has been removed' + 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set) + and: 'the subscription event is not removed from the map' + 0 * mockForwardedSubscriptionEventCache.remove(_) + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy index 3a7aa481c3..f2ff1f7b23 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy @@ -48,7 +48,7 @@ class SubscriptionEventMapperSpec extends Specification { def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap) then: 'the resulting yang model subscription event contains the correct clientId' assert result.clientId == "SCO-9989752" - and: 'client name' + and: 'subscription name' assert result.subscriptionName == "cm-subscription-001" and: 'is tagged value is false' assert !result.isTagged @@ -60,4 +60,21 @@ class SubscriptionEventMapperSpec extends Specification { assert result.topic == null } + def 'Map null subscription event to yang model subscription event where #scenario'() { + given: 'a new Subscription Event with no data' + def testEventToMap = new SubscriptionEvent() + when: 'the event is mapped to a yang model subscription' + def result = objectUnderTest.toYangModelSubscriptionEvent(testEventToMap) + then: 'the resulting yang model subscription event contains null clientId' + assert result.clientId == null + and: 'subscription name is null' + assert result.subscriptionName == null + and: 'is tagged value is false' + assert result.isTagged == false + and: 'predicates is null' + assert result.predicates == null + and: 'the topic is null' + assert result.topic == null + } + }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy index 2b0adf3420..457eb6fa99 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper +import com.hazelcast.map.IMap import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.inventory.InventoryPersistence @@ -29,20 +30,28 @@ import org.onap.cps.ncmp.event.model.SubscriptionEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.spi.exceptions.OperationNotYetSupportedException import org.onap.cps.utils.JsonObjectMapper +import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import spock.util.concurrent.BlockingVariable -@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder]) class SubscriptionEventForwarderSpec extends MessagingBaseSpec { - def mockInventoryPersistence = Mock(InventoryPersistence) - def mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>) - def objectUnderTest = new SubscriptionEventForwarder(mockInventoryPersistence, mockSubscriptionEventPublisher) + @Autowired + SubscriptionEventForwarder objectUnderTest + + @SpringBean + InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence) + @SpringBean + EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>) + @SpringBean + IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>) @Autowired JsonObjectMapper jsonObjectMapper - def 'Forward valid CM create subscription'() { + def 'Forward valid CM create subscription and simulate timeout where #scenario'() { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) @@ -52,9 +61,17 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { createYangModelCmHandleWithDmiProperty(2, 1,"shape","square"), createYangModelCmHandleWithDmiProperty(3, 2,"shape","triangle") ] + and: 'the thread creation delay is reduced to 2 seconds for testing' + objectUnderTest.dmiResponseTimeoutInMs = 2000 + and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds' + def block = new BlockingVariable<Object>(5) when: 'the valid event is forwarded' objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) - then: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future' + then: 'An asynchronous call is made to the blocking variable' + block.get() + then: 'the event is added to the forwarded subscription event cache' + 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set) + and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future' 1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", subscriptionEvent -> { Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) @@ -68,6 +85,15 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { targets["CMHandle3"] == ["shape":"triangle"] } ) + and: 'a separate thread has been created where the map is polled' + 1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true + 1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap) + and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable' + 1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)} + where: + scenario | DMINamesInMap + 'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set + 'all dmis have responded ' | [] as Set } def 'Forward CM create subscription where target CM Handles are #scenario'() { diff --git a/cps-service/src/main/java/org/onap/cps/cache/AnchorDataCacheConfig.java b/cps-service/src/main/java/org/onap/cps/cache/AnchorDataCacheConfig.java index 3acba0bb3b..5ee6b3804e 100644 --- a/cps-service/src/main/java/org/onap/cps/cache/AnchorDataCacheConfig.java +++ b/cps-service/src/main/java/org/onap/cps/cache/AnchorDataCacheConfig.java @@ -20,29 +20,16 @@ package org.onap.cps.cache; -import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; -import com.hazelcast.config.NamedConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Core infrastructure of the hazelcast distributed cache for anchor data config use cases. */ -@Slf4j @Configuration -public class AnchorDataCacheConfig { - - @Value("${hazelcast.mode.kubernetes.enabled}") - private boolean cacheKubernetesEnabled; - - @Value("${hazelcast.mode.kubernetes.service-name}") - private String cacheKubernetesServiceName; +public class AnchorDataCacheConfig extends HazelcastCacheConfig { private static final MapConfig anchorDataCacheMapConfig = createMapConfig("anchorDataCacheMapConfig"); @@ -53,36 +40,7 @@ public class AnchorDataCacheConfig { */ @Bean public IMap<String, AnchorDataCacheEntry> anchorDataCache() { - return createHazelcastInstance("hazelCastInstanceCpsCore", anchorDataCacheMapConfig) - .getMap("anchorDataCache"); - } - - private HazelcastInstance createHazelcastInstance(final String hazelcastInstanceName, - final NamedConfig namedConfig) { - return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); - } - - private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { - final Config config = new Config(instanceName); - config.addMapConfig((MapConfig) namedConfig); - config.setClusterName("cps-service-caches"); - updateDiscoveryMode(config); - return config; + return createHazelcastInstance("hazelCastInstanceCpsCore", anchorDataCacheMapConfig, "cps-service-caches") + .getMap("anchorDataCache"); } - - private static MapConfig createMapConfig(final String configName) { - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - return mapConfig; - } - - private void updateDiscoveryMode(final Config config) { - if (cacheKubernetesEnabled) { - log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName); - config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) - .setProperty("service-name", cacheKubernetesServiceName); - } - } - } diff --git a/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java new file mode 100644 index 0000000000..4aebceae0a --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/cache/HazelcastCacheConfig.java @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.cache; + +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.NamedConfig; +import com.hazelcast.config.QueueConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; + +/** + * Core infrastructure of the hazelcast distributed cache. + */ +@Slf4j +public class HazelcastCacheConfig { + + @Value("${hazelcast.mode.kubernetes.enabled}") + protected boolean cacheKubernetesEnabled; + + @Value("${hazelcast.mode.kubernetes.service-name}") + protected String cacheKubernetesServiceName; + + protected HazelcastInstance createHazelcastInstance(final String hazelcastInstanceName, + final NamedConfig namedConfig, final String clusterName) { + return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig, clusterName)); + } + + private Config initializeConfig(final String instanceName, final NamedConfig namedConfig, + final String clusterName) { + final Config config = new Config(instanceName); + if (namedConfig instanceof MapConfig) { + config.addMapConfig((MapConfig) namedConfig); + } + if (namedConfig instanceof QueueConfig) { + config.addQueueConfig((QueueConfig) namedConfig); + } + config.setClusterName(clusterName); + updateDiscoveryMode(config); + return config; + } + + protected static MapConfig createMapConfig(final String configName) { + final MapConfig mapConfig = new MapConfig(configName); + mapConfig.setBackupCount(3); + mapConfig.setAsyncBackupCount(3); + return mapConfig; + } + + protected static QueueConfig createQueueConfig(final String configName) { + final QueueConfig commonQueueConfig = new QueueConfig(configName); + commonQueueConfig.setBackupCount(3); + commonQueueConfig.setAsyncBackupCount(3); + return commonQueueConfig; + } + + protected void updateDiscoveryMode(final Config config) { + if (cacheKubernetesEnabled) { + log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName); + config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) + .setProperty("service-name", cacheKubernetesServiceName); + } + } + +} |