diff options
26 files changed, 838 insertions, 127 deletions
diff --git a/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json b/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json new file mode 100644 index 0000000000..34970ac1c3 --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json @@ -0,0 +1,81 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-subscription-event-outcome:v1", + "$ref": "#/definitions/SubscriptionEventOutcome", + "definitions": { + "SubscriptionEventOutcome": { + "description": "The payload for avc subscription event outcome message.", + "type": "object", + "javaType" : "org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome", + "properties": { + "version": { + "description": "The outcome event type version", + "type": "string" + }, + "eventType": { + "description": "The event type", + "type": "string", + "enum": [ + "COMPLETE_OUTCOME", + "PARTIAL_OUTCOME" + ] + }, + "event": { + "$ref": "#/definitions/event" + } + }, + "required": [ + "version", + "eventType", + "event" + ] + }, + "event": { + "description": "The event content for outcome message.", + "type": "object", + "javaType": "InnerSubscriptionEventOutcome", + "properties": { + "subscription": { + "description": "The subscription details.", + "type": "object", + "properties": { + "clientID": { + "description": "The clientID", + "type": "string" + }, + "name": { + "description": "The name of the subscription", + "type": "string" + } + }, + "required": [ + "clientID", + "name" + ] + }, + "predicates": { + "description": "Additional values to be added into the subscription outcome", + "type": "object", + "properties": { + "rejectedTargets": { + "description": "Rejected CM Handles to be responded by the subscription", + "type": "array" + }, + "acceptedTargets": { + "description": "Accepted CM Handles to be responded by the subscription", + "type": "array" + }, + "pendingTargets": { + "description": "Pending CM Handles to be responded by the subscription", + "type": "array" + } + } + } + }, + "required": [ + "subscription", + "predicates" + ] + } + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java index 088e96564c..2c7659949c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; +import org.apache.kafka.common.header.Header; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -41,10 +42,14 @@ public class BatchRecordFilterStrategy { @Bean public RecordFilterStrategy<Object, Object> filterBatchDataResponseEvent() { return consumedRecord -> { - final String headerValue = SerializationUtils - .deserialize(consumedRecord.headers().lastHeader("eventType").value()); - return !(headerValue != null - && headerValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); + if (eventTypeHeader != null) { + final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); + return !(eventTypeHeaderValue != null + && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + } else { + return true; + } }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java index 443ebc627a..d2f16a71d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java @@ -33,8 +33,10 @@ import org.springframework.context.annotation.Configuration; @Configuration public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { + public static final int SUBSCRIPTION_FORWARD_STARTED_TTL_SECS = 600; + private static final MapConfig forwardedSubscriptionEventCacheMapConfig = - createMapConfig("forwardedSubscriptionEventCacheMapConfig"); + createMapConfig("forwardedSubscriptionEventCacheMapConfig"); /** * Distributed instance of forwarded subscription information cache that contains subscription event diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java index e7edecfacc..9c7b79f733 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java @@ -24,27 +24,35 @@ import com.hazelcast.map.IMap; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; @Slf4j @RequiredArgsConstructor public class ResponseTimeoutTask implements Runnable { private final IMap<String, Set<String>> forwardedSubscriptionEventCache; - private final String subscriptionEventId; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; + private final String subscriptionClientId; + private final String subscriptionName; @Override public void run() { + + try { + generateAndSendResponse(); + } catch (final Exception exception) { + log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}", + exception.toString()); + } + + } + + private void generateAndSendResponse() { + final String subscriptionEventId = subscriptionClientId + subscriptionName; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - if (dmiNames.isEmpty()) { - //TODO full outcome response here - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", - subscriptionEventId); - } else { - //TODO partial outcome response here - log.info("placeholder to create partial outcome response for subscriptionEventId: {}.", - subscriptionEventId); - } + subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName, + dmiNames.isEmpty()); forwardedSubscriptionEventCache.remove(subscriptionEventId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java index c173862476..eb3daeb4da 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java @@ -22,9 +22,12 @@ package org.onap.cps.ncmp.api.impl.event.avc; import com.hazelcast.map.IMap; import java.util.Set; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper; +import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; @@ -38,13 +41,9 @@ import org.springframework.stereotype.Component; public class SubscriptionEventResponseConsumer { private final IMap<String, Set<String>> forwardedSubscriptionEventCache; - private final SubscriptionPersistence subscriptionPersistence; - private final SubscriptionEventResponseMapper subscriptionEventResponseMapper; - - @Value("${app.ncmp.avc.subscription-outcome-topic}") - private String subscriptionOutcomeEventTopic; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; @Value("${notification.enabled:true}") private boolean notificationFeatureEnabled; @@ -55,30 +54,36 @@ public class SubscriptionEventResponseConsumer { /** * Consume subscription response event. * - * @param subscriptionEventResponse the event to be consumed + * @param subscriptionEventResponseConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) - public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) { - log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId()); - final String subscriptionEventId = subscriptionEventResponse.getClientId() - + subscriptionEventResponse.getSubscriptionName(); - final boolean createOutcomeResponse; + public void consumeSubscriptionEventResponse( + final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) { + final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value(); + final String clientId = subscriptionEventResponse.getClientId(); + log.info("subscription event response of clientId: {} is received.", clientId); + final String subscriptionName = subscriptionEventResponse.getSubscriptionName(); + final String subscriptionEventId = clientId + subscriptionName; + boolean isFullOutcomeResponse = false; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName()); - createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); - if (createOutcomeResponse) { + final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + + dmiNames.remove(subscriptionEventResponse.getDmiName()); + forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + + if (isFullOutcomeResponse) { forwardedSubscriptionEventCache.remove(subscriptionEventId); } - } else { - createOutcomeResponse = true; } if (subscriptionModelLoaderEnabled) { updateSubscriptionEvent(subscriptionEventResponse); } - if (createOutcomeResponse && notificationFeatureEnabled) { - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId); - //TODO Create outcome response + if (isFullOutcomeResponse && notificationFeatureEnabled) { + subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName, + isFullOutcomeResponse); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java new file mode 100644 index 0000000000..2466bc36ec --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; + +@Mapper(componentModel = "spring") +public interface SubscriptionOutcomeMapper { + + @Mapping(source = "clientId", target = "event.subscription.clientID") + @Mapping(source = "subscriptionName", target = "event.subscription.name") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets", + qualifiedByName = "mapStatusToCmHandleRejected") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets", + qualifiedByName = "mapStatusToCmHandleAccepted") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets", + qualifiedByName = "mapStatusToCmHandlePending") + SubscriptionEventOutcome toSubscriptionEventOutcome( + SubscriptionEventResponse subscriptionEventResponse); + + /** + * Maps StatusToCMHandle to list of TargetCmHandle rejected. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleRejected") + default List<Object> mapStatusToCmHandleRejected(Map<String, SubscriptionStatus> targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle accepted. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleAccepted") + default List<Object> mapStatusToCmHandleAccepted(Map<String, SubscriptionStatus> targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle pending. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandlePending") + default List<Object> mapStatusToCmHandlePending(Map<String, SubscriptionStatus> targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index b0b091a2f6..d92316dc58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -50,6 +50,7 @@ public class EventsPublisher<T> { * @param topicName valid topic name * @param eventKey message key * @param event message payload + * @deprecated This method is not needed anymore since the use of headers will be in place. */ @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 3bf02f0b58..f37497abe6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -68,12 +68,13 @@ public class AvcEventConsumer { } private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String eventId = "eventId"; final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); - eventHeaders.remove("eventId"); + (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); + eventHeaders.remove(eventId); log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, mutatedEventId); - eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java index 7717db67a6..88b41d0075 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; @@ -50,11 +51,13 @@ public class SubscriptionEventConsumer { /** * Consume the specified event. * - * @param subscriptionEvent the event to be consumed + * @param subscriptionEventConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) - public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void consumeSubscriptionEvent( + final ConsumerRecord<String, SubscriptionEvent> subscriptionEventConsumerRecord) { + final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value(); final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); final String eventDatastore = event.getPredicates().getDatastore(); if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { @@ -71,7 +74,8 @@ public class SubscriptionEventConsumer { event.getSubscription().getClientID(), event.getSubscription().getName()); if (notificationFeatureEnabled) { - subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, + subscriptionEventConsumerRecord.headers()); } } } else { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 4654b148c6..19a0f12b0b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; +import org.onap.cps.ncmp.api.impl.event.avc.ForwardedSubscriptionEventCacheConfig; import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; @@ -53,9 +55,8 @@ public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; private final EventsPublisher<SubscriptionEvent> eventsPublisher; private final IMap<String, Set<String>> forwardedSubscriptionEventCache; - + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") private String dmiAvcSubscriptionTopicPrefix; @@ -67,7 +68,8 @@ public class SubscriptionEventForwarder { * * @param subscriptionEvent the event to be forwarded */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { final List<Object> cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { @@ -84,36 +86,44 @@ public class SubscriptionEventForwarder { final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); if (dmisToRespond.isEmpty()) { - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", - subscriptionEvent.getEvent().getSubscription().getClientID() - + subscriptionEvent.getEvent().getSubscription().getName()); - //TODO outcome response with no cmhandles + final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName, true); } else { startResponseTimeout(subscriptionEvent, dmisToRespond); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders); + } + } + + private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) { + final String subscriptionClientId = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome, + subscriptionClientId, subscriptionName); + try { + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } catch (final RuntimeException ex) { + log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}", + ex.toString()); } } private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap, - final SubscriptionEvent subscriptionEvent) { + final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); final String eventKey = createEventKey(subscriptionEvent, dmiName); final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; - eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, subscriptionEvent); + eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, eventHeaders, subscriptionEvent); }); } - private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) { - final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID() - + subscriptionEvent.getEvent().getSubscription().getName(); - - forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond); - final ResponseTimeoutTask responseTimeoutTask = - new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId); - executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); - } - private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { return subscriptionEvent.getEvent().getSubscription().getClientID() + "-" @@ -121,5 +131,4 @@ public class SubscriptionEventForwarder { + "-" + dmiName; } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java new file mode 100644 index 0000000000..ade3f22f4b --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java @@ -0,0 +1,135 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper; +import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; +import org.onap.cps.spi.model.DataNode; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventResponseOutcome { + + private final SubscriptionPersistence subscriptionPersistence; + + private final EventsPublisher<SubscriptionEventOutcome> outcomeEventsPublisher; + + private final SubscriptionOutcomeMapper subscriptionOutcomeMapper; + + @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}") + private String subscriptionOutcomeEventTopic; + + /** + * This is for construction of outcome message to be published for client apps. + * + * @param subscriptionClientId client id of the subscription. + * @param subscriptionName name of the subscription. + * @param isFullOutcomeResponse the flag to decide on complete or partial response to be generated. + */ + public void sendResponse(final String subscriptionClientId, final String subscriptionName, + final boolean isFullOutcomeResponse) { + final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse( + subscriptionClientId, subscriptionName, isFullOutcomeResponse); + final Headers headers = new RecordHeaders(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic, + subscriptionEventId, headers, subscriptionEventOutcome); + } + + private SubscriptionEventOutcome generateResponse(final String subscriptionClientId, final String subscriptionName, + final boolean isFullOutcomeResponse) { + final Collection<DataNode> dataNodes = subscriptionPersistence.getDataNodesForSubscriptionEvent(); + final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes); + final List<Collection<Serializable>> cmHandleIdToStatus = + DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves); + return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName, + isFullOutcomeResponse); + } + + + private SubscriptionEventOutcome formSubscriptionOutcomeMessage( + final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId, + final String subscriptionName, final boolean isFullOutcomeResponse) { + + final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse( + cmHandleIdToStatus, subscriptionClientId, subscriptionName); + + final SubscriptionEventOutcome subscriptionEventOutcome = + subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse); + + if (isFullOutcomeResponse) { + subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME); + } else { + subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME); + } + + return subscriptionEventOutcome; + } + + private SubscriptionEventResponse toSubscriptionEventResponse( + final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId, + final String subscriptionName) { + final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = new HashMap<>(); + final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse(); + subscriptionEventResponse.setClientId(subscriptionClientId); + subscriptionEventResponse.setSubscriptionName(subscriptionName); + + for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) { + final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator(); + while (bucketIterator.hasNext()) { + final String item = (String) bucketIterator.next(); + if ("PENDING".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.PENDING); + } + if ("REJECTED".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.REJECTED); + } + if ("ACCEPTED".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.ACCEPTED); + } + } + } + subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap); + + return subscriptionEventResponse; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java index 16d9b80f8b..f240c4510d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java @@ -20,7 +20,9 @@ package org.onap.cps.ncmp.api.impl.subscriptions; +import java.util.Collection; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.spi.model.DataNode; public interface SubscriptionPersistence { @@ -31,4 +33,10 @@ public interface SubscriptionPersistence { */ void saveSubscriptionEvent(YangModelSubscriptionEvent yangModelSubscriptionEvent); + /** + * Get data nodes. + * + * @return the DataNode as collection. + */ + Collection<DataNode> getDataNodesForSubscriptionEvent(); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java index e8de083fde..9a063d6dfd 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java @@ -51,27 +51,32 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { createSubscriptionEventJsonData(jsonObjectMapper.asJsonString(yangModelSubscriptionEvent)); final Collection<DataNode> dataNodes = cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); - final Optional<DataNode> optional = dataNodes.stream().findFirst(); - if (optional.isPresent() && optional.get().getChildDataNodes().isEmpty()) { - saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, false); - } else { - saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, true); - } + final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst(); + final boolean isCreateOperation = + dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty(); + saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, isCreateOperation); } private void saveOrUpdateSubscriptionEventYangModel(final String subscriptionEventJsonData, - final boolean isDataNodeExist) { - if (isDataNodeExist) { - log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData); - cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, - SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); - } else { + final boolean isCreateOperation) { + if (isCreateOperation) { log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData); cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); + } else { + log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData); + cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); } } + @Override + public Collection<DataNode> getDataNodesForSubscriptionEvent() { + return cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME, + SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, + FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); + } + private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) { return "{\"subscription\":[" + yangModelSubscriptionAsJson + "]}"; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java new file mode 100644 index 0000000000..2fec59b736 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.onap.cps.spi.model.DataNode; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DataNodeHelper { + + /** + * The nested DataNode object is being flattened. + * + * @param dataNode object. + * @return DataNode as stream. + */ + public static Stream<DataNode> flatten(final DataNode dataNode) { + return Stream.concat(Stream.of(dataNode), + dataNode.getChildDataNodes().stream().flatMap(DataNodeHelper::flatten)); + } + + /** + * The leaves for each DataNode is listed as map. + * + * @param dataNodes as collection. + * @return list of map for the all leaves. + */ + public static List<Map<String, Serializable>> getDataNodeLeaves(final Collection<DataNode> dataNodes) { + return dataNodes.stream() + .flatMap(DataNodeHelper::flatten) + .map(node -> node.getLeaves()) + .collect(Collectors.toList()); + } + + /** + * The cm handle and status is listed as a collection. + * + * @param dataNodeLeaves as a list of map. + * @return list of collection containing cm handle id and statuses. + */ + public static List<Collection<Serializable>> getCmHandleIdToStatus( + final List<Map<String, Serializable>> dataNodeLeaves) { + return dataNodeLeaves.stream() + .map(target -> target.values()) + .filter(col -> col.contains("PENDING") + | col.contains("ACCEPTED") + | col.contains("REJECTED")) + .collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy index e9f66892cb..80c9b69c0b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy @@ -22,7 +22,9 @@ package org.onap.cps.ncmp.api.impl.event.avc import com.fasterxml.jackson.databind.ObjectMapper import com.hazelcast.map.IMap +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper +import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistenceImpl import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.api.models.SubscriptionEventResponse @@ -34,24 +36,24 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec { IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>) def mockSubscriptionPersistence = Mock(SubscriptionPersistenceImpl) - def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper) + def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper) + def mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome) def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache, - mockSubscriptionPersistence, mockSubscriptionEventResponseMapper) + mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockSubscriptionEventResponseOutcome) + def cmHandleToStatusMap = [CMHandle1: 'PENDING', CMHandle2: 'ACCEPTED'] as Map + def testEventReceived = new SubscriptionEventResponse(clientId: 'some-client-id', + subscriptionName: 'some-subscription-name', dmiName: 'some-dmi-name', cmHandleIdToStatus: cmHandleToStatusMap) + def consumerRecord = new ConsumerRecord<String, SubscriptionEventResponse>('topic-name', 0, 0, 'event-key', testEventReceived) def 'Consume Subscription Event Response where all DMIs have responded'() { - given: 'a subscription event response with a clientId, subscriptionName and dmiName' - def testEventReceived = new SubscriptionEventResponse() - testEventReceived.clientId = 'some-client-id' - testEventReceived.subscriptionName = 'some-subscription-name' - testEventReceived.dmiName = 'some-dmi-name' - and: 'notifications are enabled' + given: 'a subscription event response and notifications are enabled' objectUnderTest.notificationFeatureEnabled = true and: 'subscription model loader is enabled' objectUnderTest.subscriptionModelLoaderEnabled = true when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEventResponse(testEventReceived) + objectUnderTest.consumeSubscriptionEventResponse(consumerRecord) then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event' 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set) @@ -59,20 +61,17 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec { 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set) and: 'the subscription event is removed from the map' 1 * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name') + and: 'a response outcome has been created' + 1 * mockSubscriptionEventResponseOutcome.sendResponse('some-client-id', 'some-subscription-name', true) } def 'Consume Subscription Event Response where another DMI has not yet responded'() { - given: 'a subscription event response with a clientId, subscriptionName and dmiName' - def testEventReceived = new SubscriptionEventResponse() - testEventReceived.clientId = 'some-client-id' - testEventReceived.subscriptionName = 'some-subscription-name' - testEventReceived.dmiName = 'some-dmi-name' - and: 'notifications are enabled' + given: 'a subscription event response and notifications are enabled' objectUnderTest.notificationFeatureEnabled = true and: 'subscription model loader is enabled' objectUnderTest.subscriptionModelLoaderEnabled = true when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEventResponse(testEventReceived) + objectUnderTest.consumeSubscriptionEventResponse(consumerRecord) then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event' 1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set) @@ -80,5 +79,7 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec { 1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set) and: 'the subscription event is not removed from the map' 0 * mockForwardedSubscriptionEventCache.remove(_) + and: 'a response outcome has not been created' + 0 * mockSubscriptionEventResponseOutcome.sendResponse(*_) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy new file mode 100644 index 0000000000..22067745f0 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc + +import com.fasterxml.jackson.databind.ObjectMapper +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification + + +@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper]) +class SubscriptionOutcomeMapperSpec extends Specification { + + SubscriptionOutcomeMapper objectUnderTest = Mappers.getMapper(SubscriptionOutcomeMapper) + + @Autowired + JsonObjectMapper jsonObjectMapper + + def 'Map subscription event response to subscription event outcome'() { + given: 'a Subscription Response Event' + def jsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json') + def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventResponse.class) + and: 'a Subscription Outcome Event' + def jsonDataOutcome = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json') + def testEventTarget = jsonObjectMapper.convertJsonString(jsonDataOutcome, SubscriptionEventOutcome.class) + when: 'the subscription response event is mapped to a subscription event outcome' + def result = objectUnderTest.toSubscriptionEventOutcome(testEventToMap) + result.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME) + then: 'the resulting subscription event outcome contains the correct clientId' + assert result == testEventTarget + } +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy index 7fb817bc9a..cde0d1fa00 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy @@ -51,9 +51,10 @@ class SubscriptionEventResponseMapperSpec extends Specification { and: 'subscription name' assert result.subscriptionName == "cm-subscription-001" and: 'predicate targets ' - assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle2"] + assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle3", "CMHandle4", "CMHandle5"] and: 'the status for these targets is set to expected values' - assert result.predicates.targetCmHandles.status == [SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED] + assert result.predicates.targetCmHandles.status == [SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED, + SubscriptionStatus.PENDING, SubscriptionStatus.PENDING] and: 'the topic is null' assert result.topic == null } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy index 243c31b39b..cccd61b716 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec @@ -48,30 +49,32 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec { given: 'an event with data category CM' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'notifications are enabled' objectUnderTest.notificationFeatureEnabled = true and: 'subscription model loader is enabled' objectUnderTest.subscriptionModelLoaderEnabled = true when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(testEventSent) + objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'the event is mapped to a yangModelSubscription' 1 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent and: 'the event is persisted' 1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent) and: 'the event is forwarded' - 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent) + 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) } def 'Consume valid CM create message where notifications and model loader are disabled'() { given: 'an event with data category CM' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'notifications are disabled' objectUnderTest.notificationFeatureEnabled = false and: 'subscription model loader is disabled' objectUnderTest.subscriptionModelLoaderEnabled = false when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(testEventSent) + objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'the event is not mapped to a yangModelSubscription' 0 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(*_) >> yangModelSubscriptionEvent and: 'the event is not persisted' @@ -84,10 +87,11 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'dataCategory is set to FM' testEventSent.getEvent().getDataType().setDataCategory("FM") when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(testEventSent) + objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'no exception is thrown' noExceptionThrown() and: 'the event is not mapped to a yangModelSubscription' @@ -102,10 +106,11 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'datastore is set to a non passthrough datastore' testEventSent.getEvent().getPredicates().setDatastore("operational") when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(testEventSent) + objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'an operation not yet supported exception is thrown' thrown(OperationNotYetSupportedException) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy index a3dec29ede..63ddcef554 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper import com.hazelcast.map.IMap +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.inventory.InventoryPersistence @@ -35,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import spock.util.concurrent.BlockingVariable +import java.util.concurrent.TimeUnit + @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder]) class SubscriptionEventForwarderSpec extends MessagingBaseSpec { @@ -47,7 +50,8 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>) @SpringBean IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>) - + @SpringBean + SubscriptionEventResponseOutcome mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome) @Autowired JsonObjectMapper jsonObjectMapper @@ -55,6 +59,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the InventoryPersistence returns private properties for the supplied CM Handles' 1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [ createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"), @@ -66,44 +71,46 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds' def block = new BlockingVariable<Object>(5) when: 'the valid event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) then: 'An asynchronous call is made to the blocking variable' block.get() then: 'the event is added to the forwarded subscription event cache' - 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set) + 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set, 600, TimeUnit.SECONDS) and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future' 1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", - subscriptionEvent -> { + consumerRecord.headers(), subscriptionEvent -> { Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) targets["CMHandle1"] == ["shape":"circle"] targets["CMHandle2"] == ["shape":"square"] } ) 1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2", - subscriptionEvent -> { + consumerRecord.headers(), subscriptionEvent -> { Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) targets["CMHandle3"] == ["shape":"triangle"] } ) and: 'a separate thread has been created where the map is polled' 1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true - 1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap) + 1 * mockForwardedSubscriptionEventCache.get(_) >> DMINamesInMap + 1 * mockSubscriptionEventResponseOutcome.sendResponse(*_) and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable' 1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)} where: scenario | DMINamesInMap 'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set - 'all dmis have responded ' | [] as Set + 'all dmis have responded' | [] as Set } def 'Forward CM create subscription where target CM Handles are #scenario'() { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the target CMHandles are set to #scenario' testEventSent.getEvent().getPredicates().setTargets(invalidTargets) when: 'the event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) then: 'an operation not yet supported exception is thrown' thrown(OperationNotYetSupportedException) where: @@ -117,6 +124,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the InventoryPersistence returns no private properties for the supplied CM Handles' 1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [] and: 'the thread creation delay is reduced to 2 seconds for testing' @@ -124,7 +132,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds' def block = new BlockingVariable<Object>(5) when: 'the valid event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) then: 'the event is not added to the forwarded subscription event cache' 0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set) and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy new file mode 100644 index 0000000000..53c5cd2c7b --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription + +import com.fasterxml.jackson.databind.ObjectMapper +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper +import org.onap.cps.ncmp.api.impl.events.EventsPublisher +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence +import org.onap.cps.ncmp.api.impl.utils.DataNodeBaseSpec +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.testcontainers.shaded.org.bouncycastle.crypto.engines.EthereumIESEngine + +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionOutcomeMapper, SubscriptionEventResponseOutcome]) +class SubscriptionEventResponseOutcomeSpec extends DataNodeBaseSpec { + + @Autowired + SubscriptionEventResponseOutcome objectUnderTest + + @SpringBean + SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence) + @SpringBean + EventsPublisher<SubscriptionEventOutcome> mockSubscriptionEventOutcomePublisher = Mock(EventsPublisher<SubscriptionEventOutcome>) + @SpringBean + SubscriptionOutcomeMapper subscriptionOutcomeMapper = Mappers.getMapper(SubscriptionOutcomeMapper) + + @Autowired + JsonObjectMapper jsonObjectMapper + + def 'Generate response via fetching data nodes from database.'() { + given: 'a db call to get data nodes for subscription event' + 1 * mockSubscriptionPersistence.getDataNodesForSubscriptionEvent() >> [dataNode4] + when: 'a response is generated' + def result = objectUnderTest.generateResponse('some-client-id', 'some-subscription-name', isFullOutcomeResponse) + then: 'the result will have the same values as same as in dataNode4' + result.eventType == eventType + result.getEvent().getSubscription().getClientID() == 'some-client-id' + result.getEvent().getSubscription().getName() == 'some-subscription-name' + result.getEvent().getPredicates().getPendingTargets() == ['CMHandle3'] + result.getEvent().getPredicates().getRejectedTargets() == ['CMHandle1'] + result.getEvent().getPredicates().getAcceptedTargets() == ['CMHandle2'] + where: 'the following values are used' + scenario | isFullOutcomeResponse || eventType + 'is full outcome' | true || SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME + 'is partial outcome' | false || SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME + } + + def 'Form subscription outcome message with a list of cm handle id to status mapping'() { + given: 'a list of collection including cm handle id to status' + def cmHandleIdToStatus = [['PENDING', 'CMHandle5'], ['PENDING', 'CMHandle4'], ['ACCEPTED', 'CMHandle1'], ['REJECTED', 'CMHandle3']] + and: 'an outcome event' + def jsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json') + def eventOutcome = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventOutcome.class) + eventOutcome.setEventType(eventType) + when: 'a subscription outcome message formed' + def result = objectUnderTest.formSubscriptionOutcomeMessage(cmHandleIdToStatus, 'SCO-9989752', + 'cm-subscription-001', isFullOutcomeResponse) + result.getEvent().getPredicates().getPendingTargets().sort() + then: 'the result will be equal to event outcome' + result == eventOutcome + where: 'the following values are used' + scenario | isFullOutcomeResponse | eventType + 'is full outcome' | true | SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME + 'is partial outcome' | false | SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy index 2d3f8ac516..edc6e3bcf8 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy @@ -35,7 +35,7 @@ class LcmEventsServiceSpec extends Specification { def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper) def 'Create and Publish lcm event where events are #scenario'() { - given: 'a cm handle id and Lcm Event' + given: 'a cm handle id, Lcm Event, and headers' def cmHandleId = 'test-cm-handle-id' def eventId = UUID.randomUUID().toString() def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy index 75760091d3..a372abe6ff 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy @@ -36,23 +36,21 @@ class SubscriptionPersistenceSpec extends Specification { private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry"; def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) - def mockCpsDataService = Mock(CpsDataService) - def objectUnderTest = new SubscriptionPersistenceImpl(jsonObjectMapper, mockCpsDataService) + def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore', + targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'), + new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')]) + def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id', + subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates) + def 'save a subscription event' () { - given: 'a yang model subscription event' - def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore', - targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'), - new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')]) - def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id', - subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates) - and: 'a data node that does not exist in db' - def dataNodeNonExist = new DataNodeBuilder().withDataspace('NCMP-Admin') + given: 'a data node that does not exist in db' + def blankDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin') .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry').build() and: 'cps data service return non existing data node' - mockCpsDataService.getDataNodes(*_) >> [dataNodeNonExist] + mockCpsDataService.getDataNodes(*_) >> [blankDataNode] when: 'the yangModelSubscriptionEvent is saved into db' objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent) then: 'the cpsDataService save operation is called with the correct data' @@ -66,20 +64,14 @@ class SubscriptionPersistenceSpec extends Specification { } def 'update a subscription event' () { - given: 'a yang model subscription event' - def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore', - targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'), - new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')]) - def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id', - subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates) - and: 'a data node exist in db' + given: 'a data node exist in db' def childDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin') .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription').build() - def dataNodeExist = new DataNodeBuilder().withDataspace('NCMP-Admin') + def engagedDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin') .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry') .withChildDataNodes([childDataNode]).build() and: 'cps data service return existing data node' - mockCpsDataService.getDataNodes(*_) >> [dataNodeExist] + mockCpsDataService.getDataNodes(*_) >> [engagedDataNode] when: 'the yangModelSubscriptionEvent is saved into db' objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent) then: 'the cpsDataService update operation is called with the correct data' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy new file mode 100644 index 0000000000..7474166ffe --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils + +import org.onap.cps.spi.model.DataNodeBuilder +import spock.lang.Specification + +class DataNodeBaseSpec extends Specification { + + def leaves1 = [status:'PENDING', cmHandleId:'CMHandle3'] as Map + def dataNode1 = createDataNodeWithLeaves(leaves1) + + def leaves2 = [status:'ACCEPTED', cmHandleId:'CMHandle2'] as Map + def dataNode2 = createDataNodeWithLeaves(leaves2) + + def leaves3 = [status:'REJECTED', cmHandleId:'CMHandle1'] as Map + def dataNode3 = createDataNodeWithLeaves(leaves3) + + def leaves4 = [datastore:'passthrough-running'] as Map + def dataNode4 = createDataNodeWithLeavesAndChildDataNodes(leaves4, [dataNode1, dataNode2, dataNode3]) + + static def createDataNodeWithLeaves(leaves) { + return new DataNodeBuilder().withDataspace('NCMP-Admin') + .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription') + .withLeaves(leaves).build() + } + + static def createDataNodeWithLeavesAndChildDataNodes(leaves, dataNodes) { + return new DataNodeBuilder().withDataspace('NCMP-Admin') + .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription') + .withLeaves(leaves).withChildDataNodes(dataNodes) + .build() + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy new file mode 100644 index 0000000000..e527ae12bb --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils + +import org.onap.cps.spi.model.DataNodeBuilder + +class DataNodeHelperSpec extends DataNodeBaseSpec { + + def 'Get data node leaves as expected from a nested data node.'() { + given: 'a nested data node' + def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin') + .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription') + .withLeaves([clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001']) + .withChildDataNodes([dataNode4]).build() + when: 'the nested data node is flatten and retrieves the leaves ' + def result = DataNodeHelper.getDataNodeLeaves([dataNode]) + then: 'the result list size is 5' + result.size() == 5 + and: 'all the leaves result list are equal to given leaves of data nodes' + result[0] == [clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001'] + result[1] == [datastore:'passthrough-running'] + result[2] == [status:'PENDING', cmHandleId:'CMHandle3'] + result[3] == [status:'ACCEPTED', cmHandleId:'CMHandle2'] + result[4] == [status:'REJECTED', cmHandleId:'CMHandle1'] + } + + def 'Get cm handle id to status as expected from a nested data node.'() { + given: 'a nested data node' + def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin') + .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription') + .withLeaves([clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001']) + .withChildDataNodes([dataNode4]).build() + and: 'the nested data node is flatten and retrieves the leaves ' + def leaves = DataNodeHelper.getDataNodeLeaves([dataNode]) + when:'cm handle id to status is retrieved' + def result = DataNodeHelper.getCmHandleIdToStatus(leaves); + then: 'the result list size is 3' + result.size() == 3 + } +} diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json index b054362c93..3244f05a03 100644 --- a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json +++ b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json @@ -4,6 +4,8 @@ "dmiName": "ncmp-dmi-plugin", "cmHandleIdToStatus": { "CMHandle1": "ACCEPTED", - "CMHandle2": "REJECTED" + "CMHandle3": "REJECTED", + "CMHandle4": "PENDING", + "CMHandle5": "PENDING" } }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json new file mode 100644 index 0000000000..6bfa36bf79 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json @@ -0,0 +1,21 @@ +{ + "eventType": "PARTIAL_OUTCOME", + "event": { + "subscription": { + "clientID": "SCO-9989752", + "name": "cm-subscription-001" + }, + "predicates": { + "rejectedTargets": [ + "CMHandle3" + ], + "acceptedTargets": [ + "CMHandle1" + ], + "pendingTargets": [ + "CMHandle4", + "CMHandle5" + ] + } + } +}
\ No newline at end of file |