From 3c0ea89e2bd77d2f4f86f0729643455e1b29c5ac Mon Sep 17 00:00:00 2001 From: lukegleeson Date: Thu, 6 Apr 2023 15:28:56 +0100 Subject: Timeout for Subscription Create Partial Response - Implemented default 30s timeout for DMI Responses - Placeholders have been TODO'd for Outcome Response generation and Persisted Subscription Updating - Refactored common HazelcastCacheConfig methods - Some tests required a blocking variable due to seperate thread usage Issue-ID: CPS-1599 Signed-off-by: lukegleeson Change-Id: I2b1a35e93939daa0524d379ac4736d714ef61a6f --- .../embeddedcache/SynchronizationCacheConfig.java | 66 +++-------------- .../avc/ForwardedSubscriptionEventCacheConfig.java | 51 ++++++++++++++ .../api/impl/event/avc/ResponseTimeoutTask.java | 51 ++++++++++++++ .../avc/SubscriptionEventResponseConsumer.java | 82 ++++++++++++++++++++++ .../SubscriptionEventForwarder.java | 44 ++++++++++-- .../ncmp/api/models/SubscriptionEventResponse.java | 37 ++++++++++ .../cps/ncmp/init/SubscriptionModelLoader.java | 8 +-- 7 files changed, 272 insertions(+), 67 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java (limited to 'cps-ncmp-service/src/main/java/org/onap') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java index 0b67266375..ff7afc9eb7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -20,17 +20,13 @@ package org.onap.cps.ncmp.api.impl.config.embeddedcache; -import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; -import com.hazelcast.config.NamedConfig; import com.hazelcast.config.QueueConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; import java.util.concurrent.BlockingQueue; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.cache.HazelcastCacheConfig; import org.onap.cps.spi.model.DataNode; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,18 +35,12 @@ import org.springframework.context.annotation.Configuration; */ @Slf4j @Configuration -public class SynchronizationCacheConfig { +public class SynchronizationCacheConfig extends HazelcastCacheConfig { public static final int MODULE_SYNC_STARTED_TTL_SECS = 600; public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800; - @Value("${hazelcast.mode.kubernetes.enabled}") - private boolean cacheKubernetesEnabled; - - @Value("${hazelcast.mode.kubernetes.service-name}") - private String cacheKubernetesServiceName; - - private static final QueueConfig commonQueueConfig = createQueueConfig(); + private static final QueueConfig commonQueueConfig = createQueueConfig("defaultQueueConfig"); private static final MapConfig moduleSyncStartedConfig = createMapConfig("moduleSyncStartedConfig"); private static final MapConfig dataSyncSemaphoresConfig = createMapConfig("dataSyncSemaphoresConfig"); @@ -61,7 +51,8 @@ public class SynchronizationCacheConfig { */ @Bean public BlockingQueue moduleSyncWorkQueue() { - return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig) + return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig, + "synchronization-caches") .getQueue("moduleSyncWorkQueue"); } @@ -72,7 +63,8 @@ public class SynchronizationCacheConfig { */ @Bean public IMap moduleSyncStartedOnCmHandles() { - return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) + return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig, + "synchronization-caches") .getMap("moduleSyncStartedOnCmHandles"); } @@ -83,48 +75,8 @@ public class SynchronizationCacheConfig { */ @Bean public IMap dataSyncSemaphores() { - return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) + return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig, + "synchronization-caches") .getMap("dataSyncSemaphores"); } - - private HazelcastInstance createHazelcastInstance( - final String hazelcastInstanceName, final NamedConfig namedConfig) { - return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); - } - - private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { - final Config config = new Config(instanceName); - if (namedConfig instanceof MapConfig) { - config.addMapConfig((MapConfig) namedConfig); - } - if (namedConfig instanceof QueueConfig) { - config.addQueueConfig((QueueConfig) namedConfig); - } - config.setClusterName("synchronization-caches"); - updateDiscoveryMode(config); - return config; - } - - private static QueueConfig createQueueConfig() { - final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig"); - commonQueueConfig.setBackupCount(3); - commonQueueConfig.setAsyncBackupCount(3); - return commonQueueConfig; - } - - private static MapConfig createMapConfig(final String configName) { - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - return mapConfig; - } - - private void updateDiscoveryMode(final Config config) { - if (cacheKubernetesEnabled) { - log.info("Enabling kubernetes mode with service-name : {}", cacheKubernetesServiceName); - config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) - .setProperty("service-name", cacheKubernetesServiceName); - } - } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java new file mode 100644 index 0000000000..d2c3dc2599 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.config.MapConfig; +import com.hazelcast.map.IMap; +import java.util.Set; +import org.onap.cps.cache.HazelcastCacheConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed cache for subscription forward config use cases. + */ +@Configuration +public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { + + private static final MapConfig forwardedSubscriptionEventCacheMapConfig = + createMapConfig("forwardedSubscriptionEventCacheMapConfig"); + + /** + * Distributed instance of forwarded subscription information cache that contains subscription event + * id by dmi names as properties. + * + * @return configured map of subscription event ids as keys to sets of dmi names for values + */ + @Bean + public IMap> forwardedSubscriptionEventCache() { + return createHazelcastInstance("hazelCastInstanceSubscriptionEvents", + forwardedSubscriptionEventCacheMapConfig, "cps-ncmp-service-caches") + .getMap("forwardedSubscriptionEventCache"); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java new file mode 100644 index 0000000000..e7edecfacc --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class ResponseTimeoutTask implements Runnable { + + private final IMap> forwardedSubscriptionEventCache; + private final String subscriptionEventId; + + @Override + public void run() { + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + if (dmiNames.isEmpty()) { + //TODO full outcome response here + log.info("placeholder to create full outcome response for subscriptionEventId: {}.", + subscriptionEventId); + } else { + //TODO partial outcome response here + log.info("placeholder to create partial outcome response for subscriptionEventId: {}.", + subscriptionEventId); + } + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java new file mode 100644 index 0000000000..b332ad1a0e --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventResponseConsumer { + + private final IMap> forwardedSubscriptionEventCache; + + @Value("${app.ncmp.avc.subscription-outcome-topic}") + private String subscriptionOutcomeEventTopic; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume subscription response event. + * + * @param subscriptionEventResponse the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) + public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) { + log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId()); + final String subscriptionEventId = subscriptionEventResponse.getClientId() + + subscriptionEventResponse.getSubscriptionName(); + final boolean createOutcomeResponse; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName()); + createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + if (createOutcomeResponse) { + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } else { + createOutcomeResponse = true; + } + if (subscriptionModelLoaderEnabled) { + updateSubscriptionEvent(subscriptionEventResponse); + } + if (createOutcomeResponse && notificationFeatureEnabled) { + log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId); + //TODO Create outcome response + } + } + + private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { + log.info("placeholder to update persisted subscription for subscriptionEventId: {}.", + subscriptionEventResponse.getClientId() + subscriptionEventResponse.getSubscriptionName()); + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 1fb72a59d3..4afa051d30 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -20,20 +20,28 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; +import com.hazelcast.map.IMap; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.event.model.SubscriptionEvent; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -44,8 +52,15 @@ public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; private final EventsPublisher eventsPublisher; + private final IMap> forwardedSubscriptionEventCache; + + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}") + private int dmiResponseTimeoutInMs; + /** * Forward subscription event. * @@ -65,15 +80,32 @@ public class SubscriptionEventForwarder { final Map>> dmiPropertiesPerCmHandleIdPerServiceName = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); - dmiPropertiesPerCmHandleIdPerServiceName.forEach((dmiServiceName, cmHandlePropertiesMap) -> { - subscriptionEvent.getEvent().getPredicates().setTargets(Collections - .singletonList(cmHandlePropertiesMap)); - final String eventKey = createEventKey(subscriptionEvent, dmiServiceName); - eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiServiceName, eventKey, - subscriptionEvent); + + final Set dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); + startResponseTimeout(subscriptionEvent, dmisToRespond); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent); + } + + private void forwardEventToDmis(final Map>> dmiNameCmHandleMap, + final SubscriptionEvent subscriptionEvent) { + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); + final String eventKey = createEventKey(subscriptionEvent, dmiName); + eventsPublisher.publishEvent( + DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); }); } + private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set dmisToRespond) { + final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID() + + subscriptionEvent.getEvent().getSubscription().getName(); + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId); + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } + private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { return subscriptionEvent.getEvent().getSubscription().getClientID() + "-" diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java new file mode 100644 index 0000000000..95e773c8c9 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/SubscriptionEventResponse.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.models; + +import com.fasterxml.jackson.annotation.JsonInclude; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@Getter +@Setter +public class SubscriptionEventResponse { + private String clientId; + private String subscriptionName; + private String dmiName; + private Map cmHandleIdToStatus; +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java index 5a418fd312..659779acf9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java @@ -52,10 +52,10 @@ public class SubscriptionModelLoader implements ModelLoader { private static final String SUBSCRIPTION_SCHEMASET_NAME = "subscriptions"; private static final String SUBSCRIPTION_REGISTRY_DATANODE_NAME = "subscription-registry"; - @Value("${ncmp.model-loader.maximumAttemptCount:20}") + @Value("${ncmp.model-loader.maximum-attempt-count:20}") private int maximumAttemptCount; - @Value("${ncmp.model-loader.retryTimeMs:1000}") + @Value("${ncmp.timers.model-loader.retry-time-ms:1000}") private long retryTimeMs; @Value("${ncmp.model-loader.subscription:false}") @@ -99,7 +99,7 @@ public class SubscriptionModelLoader implements ModelLoader { } } else { throw new NcmpStartUpException("Retrieval of NCMP dataspace fails", - "NCMP dataspace does not exist"); + "NCMP dataspace does not exist"); } } } @@ -139,7 +139,7 @@ public class SubscriptionModelLoader implements ModelLoader { */ @Override public boolean createAnchor(final String dataspaceName, final String schemaSetName, - final String anchorName) { + final String anchorName) { try { cpsAdminService.createAnchor(dataspaceName, schemaSetName, anchorName); } catch (final AlreadyDefinedException exception) { -- cgit 1.2.3-korg