summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json81
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java28
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java88
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java1
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java7
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java10
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java51
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java135
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java29
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java75
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy33
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy55
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy5
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy15
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy26
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy89
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy32
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy52
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy58
-rw-r--r--cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json4
-rw-r--r--cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json21
-rw-r--r--docs/cps-path.rst5
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/CpsQueryServiceIntegrationSpec.groovy2
27 files changed, 834 insertions, 125 deletions
diff --git a/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json b/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json
new file mode 100644
index 000000000..34970ac1c
--- /dev/null
+++ b/cps-ncmp-events/src/main/resources/schemas/avc-subscription-event-outcome-v1.json
@@ -0,0 +1,81 @@
+{
+ "$schema": "https://json-schema.org/draft/2019-09/schema",
+ "$id": "urn:cps:org.onap.cps.ncmp.events:avc-subscription-event-outcome:v1",
+ "$ref": "#/definitions/SubscriptionEventOutcome",
+ "definitions": {
+ "SubscriptionEventOutcome": {
+ "description": "The payload for avc subscription event outcome message.",
+ "type": "object",
+ "javaType" : "org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome",
+ "properties": {
+ "version": {
+ "description": "The outcome event type version",
+ "type": "string"
+ },
+ "eventType": {
+ "description": "The event type",
+ "type": "string",
+ "enum": [
+ "COMPLETE_OUTCOME",
+ "PARTIAL_OUTCOME"
+ ]
+ },
+ "event": {
+ "$ref": "#/definitions/event"
+ }
+ },
+ "required": [
+ "version",
+ "eventType",
+ "event"
+ ]
+ },
+ "event": {
+ "description": "The event content for outcome message.",
+ "type": "object",
+ "javaType": "InnerSubscriptionEventOutcome",
+ "properties": {
+ "subscription": {
+ "description": "The subscription details.",
+ "type": "object",
+ "properties": {
+ "clientID": {
+ "description": "The clientID",
+ "type": "string"
+ },
+ "name": {
+ "description": "The name of the subscription",
+ "type": "string"
+ }
+ },
+ "required": [
+ "clientID",
+ "name"
+ ]
+ },
+ "predicates": {
+ "description": "Additional values to be added into the subscription outcome",
+ "type": "object",
+ "properties": {
+ "rejectedTargets": {
+ "description": "Rejected CM Handles to be responded by the subscription",
+ "type": "array"
+ },
+ "acceptedTargets": {
+ "description": "Accepted CM Handles to be responded by the subscription",
+ "type": "array"
+ },
+ "pendingTargets": {
+ "description": "Pending CM Handles to be responded by the subscription",
+ "type": "array"
+ }
+ }
+ }
+ },
+ "required": [
+ "subscription",
+ "predicates"
+ ]
+ }
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
index 443ebc627..d2f16a71d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java
@@ -33,8 +33,10 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig {
+ public static final int SUBSCRIPTION_FORWARD_STARTED_TTL_SECS = 600;
+
private static final MapConfig forwardedSubscriptionEventCacheMapConfig =
- createMapConfig("forwardedSubscriptionEventCacheMapConfig");
+ createMapConfig("forwardedSubscriptionEventCacheMapConfig");
/**
* Distributed instance of forwarded subscription information cache that contains subscription event
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
index e7edecfac..9c7b79f73 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java
@@ -24,27 +24,35 @@ import com.hazelcast.map.IMap;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome;
@Slf4j
@RequiredArgsConstructor
public class ResponseTimeoutTask implements Runnable {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
- private final String subscriptionEventId;
+ private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
+ private final String subscriptionClientId;
+ private final String subscriptionName;
@Override
public void run() {
+
+ try {
+ generateAndSendResponse();
+ } catch (final Exception exception) {
+ log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}",
+ exception.toString());
+ }
+
+ }
+
+ private void generateAndSendResponse() {
+ final String subscriptionEventId = subscriptionClientId + subscriptionName;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
- if (dmiNames.isEmpty()) {
- //TODO full outcome response here
- log.info("placeholder to create full outcome response for subscriptionEventId: {}.",
- subscriptionEventId);
- } else {
- //TODO partial outcome response here
- log.info("placeholder to create partial outcome response for subscriptionEventId: {}.",
- subscriptionEventId);
- }
+ subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName,
+ dmiNames.isEmpty());
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
index c17386247..eb3daeb4d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java
@@ -22,9 +22,12 @@ package org.onap.cps.ncmp.api.impl.event.avc;
import com.hazelcast.map.IMap;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper;
+import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
@@ -38,13 +41,9 @@ import org.springframework.stereotype.Component;
public class SubscriptionEventResponseConsumer {
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
-
private final SubscriptionPersistence subscriptionPersistence;
-
private final SubscriptionEventResponseMapper subscriptionEventResponseMapper;
-
- @Value("${app.ncmp.avc.subscription-outcome-topic}")
- private String subscriptionOutcomeEventTopic;
+ private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
@Value("${notification.enabled:true}")
private boolean notificationFeatureEnabled;
@@ -55,30 +54,36 @@ public class SubscriptionEventResponseConsumer {
/**
* Consume subscription response event.
*
- * @param subscriptionEventResponse the event to be consumed
+ * @param subscriptionEventResponseConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"})
- public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) {
- log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId());
- final String subscriptionEventId = subscriptionEventResponse.getClientId()
- + subscriptionEventResponse.getSubscriptionName();
- final boolean createOutcomeResponse;
+ public void consumeSubscriptionEventResponse(
+ final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) {
+ final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value();
+ final String clientId = subscriptionEventResponse.getClientId();
+ log.info("subscription event response of clientId: {} is received.", clientId);
+ final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
+ final String subscriptionEventId = clientId + subscriptionName;
+ boolean isFullOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName());
- createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
- if (createOutcomeResponse) {
+ final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
+
+ dmiNames.remove(subscriptionEventResponse.getDmiName());
+ forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
+ ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
+ isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
+
+ if (isFullOutcomeResponse) {
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
- } else {
- createOutcomeResponse = true;
}
if (subscriptionModelLoaderEnabled) {
updateSubscriptionEvent(subscriptionEventResponse);
}
- if (createOutcomeResponse && notificationFeatureEnabled) {
- log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId);
- //TODO Create outcome response
+ if (isFullOutcomeResponse && notificationFeatureEnabled) {
+ subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName,
+ isFullOutcomeResponse);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java
new file mode 100644
index 000000000..2466bc36e
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.Named;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
+
+@Mapper(componentModel = "spring")
+public interface SubscriptionOutcomeMapper {
+
+ @Mapping(source = "clientId", target = "event.subscription.clientID")
+ @Mapping(source = "subscriptionName", target = "event.subscription.name")
+ @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets",
+ qualifiedByName = "mapStatusToCmHandleRejected")
+ @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets",
+ qualifiedByName = "mapStatusToCmHandleAccepted")
+ @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets",
+ qualifiedByName = "mapStatusToCmHandlePending")
+ SubscriptionEventOutcome toSubscriptionEventOutcome(
+ SubscriptionEventResponse subscriptionEventResponse);
+
+ /**
+ * Maps StatusToCMHandle to list of TargetCmHandle rejected.
+ *
+ * @param targets as a map
+ * @return TargetCmHandle list
+ */
+ @Named("mapStatusToCmHandleRejected")
+ default List<Object> mapStatusToCmHandleRejected(Map<String, SubscriptionStatus> targets) {
+ return targets.entrySet()
+ .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue()))
+ .map(target -> target.getKey())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Maps StatusToCMHandle to list of TargetCmHandle accepted.
+ *
+ * @param targets as a map
+ * @return TargetCmHandle list
+ */
+ @Named("mapStatusToCmHandleAccepted")
+ default List<Object> mapStatusToCmHandleAccepted(Map<String, SubscriptionStatus> targets) {
+ return targets.entrySet()
+ .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue()))
+ .map(target -> target.getKey())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Maps StatusToCMHandle to list of TargetCmHandle pending.
+ *
+ * @param targets as a map
+ * @return TargetCmHandle list
+ */
+ @Named("mapStatusToCmHandlePending")
+ default List<Object> mapStatusToCmHandlePending(Map<String, SubscriptionStatus> targets) {
+ return targets.entrySet()
+ .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue()))
+ .map(target -> target.getKey())
+ .collect(Collectors.toList());
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index b0b091a2f..d92316dc5 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -50,6 +50,7 @@ public class EventsPublisher<T> {
* @param topicName valid topic name
* @param eventKey message key
* @param event message payload
+ * @deprecated This method is not needed anymore since the use of headers will be in place.
*/
@Deprecated
public void publishEvent(final String topicName, final String eventKey, final T event) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index 3bf02f0b5..f37497abe 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -68,12 +68,13 @@ public class AvcEventConsumer {
}
private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) {
+ final String eventId = "eventId";
final String existingEventId =
- (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value());
- eventHeaders.remove("eventId");
+ (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value());
+ eventHeaders.remove(eventId);
log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId,
mutatedEventId);
- eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId)));
+ eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId)));
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
index 7717db67a..88b41d007 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent;
@@ -50,11 +51,13 @@ public class SubscriptionEventConsumer {
/**
* Consume the specified event.
*
- * @param subscriptionEvent the event to be consumed
+ * @param subscriptionEventConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"})
- public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void consumeSubscriptionEvent(
+ final ConsumerRecord<String, SubscriptionEvent> subscriptionEventConsumerRecord) {
+ final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value();
final InnerSubscriptionEvent event = subscriptionEvent.getEvent();
final String eventDatastore = event.getPredicates().getDatastore();
if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
@@ -71,7 +74,8 @@ public class SubscriptionEventConsumer {
event.getSubscription().getClientID(),
event.getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent);
+ subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent,
+ subscriptionEventConsumerRecord.headers());
}
}
} else {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 4654b148c..19a0f12b0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.header.Headers;
+import org.onap.cps.ncmp.api.impl.event.avc.ForwardedSubscriptionEventCacheConfig;
import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
@@ -53,9 +55,8 @@ public class SubscriptionEventForwarder {
private final InventoryPersistence inventoryPersistence;
private final EventsPublisher<SubscriptionEvent> eventsPublisher;
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
-
+ private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-
@Value("${app.ncmp.avc.subscription-forward-topic-prefix}")
private String dmiAvcSubscriptionTopicPrefix;
@@ -67,7 +68,8 @@ public class SubscriptionEventForwarder {
*
* @param subscriptionEvent the event to be forwarded
*/
- public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent,
+ final Headers eventHeaders) {
final List<Object> cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets();
if (cmHandleTargets == null || cmHandleTargets.isEmpty()
|| cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) {
@@ -84,36 +86,44 @@ public class SubscriptionEventForwarder {
final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
if (dmisToRespond.isEmpty()) {
- log.info("placeholder to create full outcome response for subscriptionEventId: {}.",
- subscriptionEvent.getEvent().getSubscription().getClientID()
- + subscriptionEvent.getEvent().getSubscription().getName());
- //TODO outcome response with no cmhandles
+ final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID();
+ final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName();
+ subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName, true);
} else {
startResponseTimeout(subscriptionEvent, dmisToRespond);
- forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent);
+ forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders);
+ }
+ }
+
+ private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
+ final String subscriptionClientId = subscriptionEvent.getEvent().getSubscription().getClientID();
+ final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName();
+ final String subscriptionEventId = subscriptionClientId + subscriptionName;
+
+ forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond,
+ ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
+ final ResponseTimeoutTask responseTimeoutTask =
+ new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome,
+ subscriptionClientId, subscriptionName);
+ try {
+ executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
+ } catch (final RuntimeException ex) {
+ log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}",
+ ex.toString());
}
}
private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap,
- final SubscriptionEvent subscriptionEvent) {
+ final SubscriptionEvent subscriptionEvent,
+ final Headers eventHeaders) {
dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> {
subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap));
final String eventKey = createEventKey(subscriptionEvent, dmiName);
final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName;
- eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, subscriptionEvent);
+ eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, eventHeaders, subscriptionEvent);
});
}
- private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) {
- final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID()
- + subscriptionEvent.getEvent().getSubscription().getName();
-
- forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond);
- final ResponseTimeoutTask responseTimeoutTask =
- new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId);
- executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS);
- }
-
private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) {
return subscriptionEvent.getEvent().getSubscription().getClientID()
+ "-"
@@ -121,5 +131,4 @@ public class SubscriptionEventForwarder {
+ "-"
+ dmiName;
}
-
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
new file mode 100644
index 000000000..ade3f22f4
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
@@ -0,0 +1,135 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper;
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class SubscriptionEventResponseOutcome {
+
+ private final SubscriptionPersistence subscriptionPersistence;
+
+ private final EventsPublisher<SubscriptionEventOutcome> outcomeEventsPublisher;
+
+ private final SubscriptionOutcomeMapper subscriptionOutcomeMapper;
+
+ @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}")
+ private String subscriptionOutcomeEventTopic;
+
+ /**
+ * This is for construction of outcome message to be published for client apps.
+ *
+ * @param subscriptionClientId client id of the subscription.
+ * @param subscriptionName name of the subscription.
+ * @param isFullOutcomeResponse the flag to decide on complete or partial response to be generated.
+ */
+ public void sendResponse(final String subscriptionClientId, final String subscriptionName,
+ final boolean isFullOutcomeResponse) {
+ final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse(
+ subscriptionClientId, subscriptionName, isFullOutcomeResponse);
+ final Headers headers = new RecordHeaders();
+ final String subscriptionEventId = subscriptionClientId + subscriptionName;
+ outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic,
+ subscriptionEventId, headers, subscriptionEventOutcome);
+ }
+
+ private SubscriptionEventOutcome generateResponse(final String subscriptionClientId, final String subscriptionName,
+ final boolean isFullOutcomeResponse) {
+ final Collection<DataNode> dataNodes = subscriptionPersistence.getDataNodesForSubscriptionEvent();
+ final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
+ final List<Collection<Serializable>> cmHandleIdToStatus =
+ DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
+ return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName,
+ isFullOutcomeResponse);
+ }
+
+
+ private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
+ final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
+ final String subscriptionName, final boolean isFullOutcomeResponse) {
+
+ final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse(
+ cmHandleIdToStatus, subscriptionClientId, subscriptionName);
+
+ final SubscriptionEventOutcome subscriptionEventOutcome =
+ subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse);
+
+ if (isFullOutcomeResponse) {
+ subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME);
+ } else {
+ subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME);
+ }
+
+ return subscriptionEventOutcome;
+ }
+
+ private SubscriptionEventResponse toSubscriptionEventResponse(
+ final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
+ final String subscriptionName) {
+ final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = new HashMap<>();
+ final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
+ subscriptionEventResponse.setClientId(subscriptionClientId);
+ subscriptionEventResponse.setSubscriptionName(subscriptionName);
+
+ for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
+ final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
+ while (bucketIterator.hasNext()) {
+ final String item = (String) bucketIterator.next();
+ if ("PENDING".equals(item)) {
+ cmHandleIdToStatusMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.PENDING);
+ }
+ if ("REJECTED".equals(item)) {
+ cmHandleIdToStatusMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.REJECTED);
+ }
+ if ("ACCEPTED".equals(item)) {
+ cmHandleIdToStatusMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.ACCEPTED);
+ }
+ }
+ }
+ subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap);
+
+ return subscriptionEventResponse;
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
index 16d9b80f8..f240c4510 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
@@ -20,7 +20,9 @@
package org.onap.cps.ncmp.api.impl.subscriptions;
+import java.util.Collection;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
+import org.onap.cps.spi.model.DataNode;
public interface SubscriptionPersistence {
@@ -31,4 +33,10 @@ public interface SubscriptionPersistence {
*/
void saveSubscriptionEvent(YangModelSubscriptionEvent yangModelSubscriptionEvent);
+ /**
+ * Get data nodes.
+ *
+ * @return the DataNode as collection.
+ */
+ Collection<DataNode> getDataNodesForSubscriptionEvent();
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
index e8de083fd..9a063d6df 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
@@ -51,27 +51,32 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
createSubscriptionEventJsonData(jsonObjectMapper.asJsonString(yangModelSubscriptionEvent));
final Collection<DataNode> dataNodes = cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
- final Optional<DataNode> optional = dataNodes.stream().findFirst();
- if (optional.isPresent() && optional.get().getChildDataNodes().isEmpty()) {
- saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, false);
- } else {
- saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, true);
- }
+ final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst();
+ final boolean isCreateOperation =
+ dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty();
+ saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, isCreateOperation);
}
private void saveOrUpdateSubscriptionEventYangModel(final String subscriptionEventJsonData,
- final boolean isDataNodeExist) {
- if (isDataNodeExist) {
- log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData);
- cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
- } else {
+ final boolean isCreateOperation) {
+ if (isCreateOperation) {
log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData);
cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+ } else {
+ log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData);
+ cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
}
}
+ @Override
+ public Collection<DataNode> getDataNodesForSubscriptionEvent() {
+ return cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
+ SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT,
+ FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+ }
+
private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) {
return "{\"subscription\":[" + yangModelSubscriptionAsJson + "]}";
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
new file mode 100644
index 000000000..2fec59b73
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.onap.cps.spi.model.DataNode;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DataNodeHelper {
+
+ /**
+ * The nested DataNode object is being flattened.
+ *
+ * @param dataNode object.
+ * @return DataNode as stream.
+ */
+ public static Stream<DataNode> flatten(final DataNode dataNode) {
+ return Stream.concat(Stream.of(dataNode),
+ dataNode.getChildDataNodes().stream().flatMap(DataNodeHelper::flatten));
+ }
+
+ /**
+ * The leaves for each DataNode is listed as map.
+ *
+ * @param dataNodes as collection.
+ * @return list of map for the all leaves.
+ */
+ public static List<Map<String, Serializable>> getDataNodeLeaves(final Collection<DataNode> dataNodes) {
+ return dataNodes.stream()
+ .flatMap(DataNodeHelper::flatten)
+ .map(node -> node.getLeaves())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * The cm handle and status is listed as a collection.
+ *
+ * @param dataNodeLeaves as a list of map.
+ * @return list of collection containing cm handle id and statuses.
+ */
+ public static List<Collection<Serializable>> getCmHandleIdToStatus(
+ final List<Map<String, Serializable>> dataNodeLeaves) {
+ return dataNodeLeaves.stream()
+ .map(target -> target.values())
+ .filter(col -> col.contains("PENDING")
+ | col.contains("ACCEPTED")
+ | col.contains("REJECTED"))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
index e9f66892c..80c9b69c0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumerSpec.groovy
@@ -22,7 +22,9 @@ package org.onap.cps.ncmp.api.impl.event.avc
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper
+import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistenceImpl
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
@@ -34,24 +36,24 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
def mockSubscriptionPersistence = Mock(SubscriptionPersistenceImpl)
- def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper)
+ def mockSubscriptionEventResponseMapper = Mock(SubscriptionEventResponseMapper)
+ def mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
def objectUnderTest = new SubscriptionEventResponseConsumer(mockForwardedSubscriptionEventCache,
- mockSubscriptionPersistence, mockSubscriptionEventResponseMapper)
+ mockSubscriptionPersistence, mockSubscriptionEventResponseMapper, mockSubscriptionEventResponseOutcome)
+ def cmHandleToStatusMap = [CMHandle1: 'PENDING', CMHandle2: 'ACCEPTED'] as Map
+ def testEventReceived = new SubscriptionEventResponse(clientId: 'some-client-id',
+ subscriptionName: 'some-subscription-name', dmiName: 'some-dmi-name', cmHandleIdToStatus: cmHandleToStatusMap)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEventResponse>('topic-name', 0, 0, 'event-key', testEventReceived)
def 'Consume Subscription Event Response where all DMIs have responded'() {
- given: 'a subscription event response with a clientId, subscriptionName and dmiName'
- def testEventReceived = new SubscriptionEventResponse()
- testEventReceived.clientId = 'some-client-id'
- testEventReceived.subscriptionName = 'some-subscription-name'
- testEventReceived.dmiName = 'some-dmi-name'
- and: 'notifications are enabled'
+ given: 'a subscription event response and notifications are enabled'
objectUnderTest.notificationFeatureEnabled = true
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = true
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+ objectUnderTest.consumeSubscriptionEventResponse(consumerRecord)
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name'] as Set)
@@ -59,20 +61,17 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> ([] as Set)
and: 'the subscription event is removed from the map'
1 * mockForwardedSubscriptionEventCache.remove('some-client-idsome-subscription-name')
+ and: 'a response outcome has been created'
+ 1 * mockSubscriptionEventResponseOutcome.sendResponse('some-client-id', 'some-subscription-name', true)
}
def 'Consume Subscription Event Response where another DMI has not yet responded'() {
- given: 'a subscription event response with a clientId, subscriptionName and dmiName'
- def testEventReceived = new SubscriptionEventResponse()
- testEventReceived.clientId = 'some-client-id'
- testEventReceived.subscriptionName = 'some-subscription-name'
- testEventReceived.dmiName = 'some-dmi-name'
- and: 'notifications are enabled'
+ given: 'a subscription event response and notifications are enabled'
objectUnderTest.notificationFeatureEnabled = true
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = true
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEventResponse(testEventReceived)
+ objectUnderTest.consumeSubscriptionEventResponse(consumerRecord)
then: 'the forwarded subscription event cache returns only the received dmiName existing for the subscription create event'
1 * mockForwardedSubscriptionEventCache.containsKey('some-client-idsome-subscription-name') >> true
1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['some-dmi-name', 'non-responded-dmi'] as Set)
@@ -80,5 +79,7 @@ class SubscriptionEventResponseConsumerSpec extends MessagingBaseSpec {
1 * mockForwardedSubscriptionEventCache.get('some-client-idsome-subscription-name') >> (['non-responded-dmi'] as Set)
and: 'the subscription event is not removed from the map'
0 * mockForwardedSubscriptionEventCache.remove(_)
+ and: 'a response outcome has not been created'
+ 0 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy
new file mode 100644
index 000000000..22067745f
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapperSpec.groovy
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event.avc
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.api.models.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+
+@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper])
+class SubscriptionOutcomeMapperSpec extends Specification {
+
+ SubscriptionOutcomeMapper objectUnderTest = Mappers.getMapper(SubscriptionOutcomeMapper)
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ def 'Map subscription event response to subscription event outcome'() {
+ given: 'a Subscription Response Event'
+ def jsonData = TestUtils.getResourceFileContent('avcSubscriptionEventResponse.json')
+ def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventResponse.class)
+ and: 'a Subscription Outcome Event'
+ def jsonDataOutcome = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
+ def testEventTarget = jsonObjectMapper.convertJsonString(jsonDataOutcome, SubscriptionEventOutcome.class)
+ when: 'the subscription response event is mapped to a subscription event outcome'
+ def result = objectUnderTest.toSubscriptionEventOutcome(testEventToMap)
+ result.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME)
+ then: 'the resulting subscription event outcome contains the correct clientId'
+ assert result == testEventTarget
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy
index 7fb817bc9..cde0d1fa0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy
@@ -51,9 +51,10 @@ class SubscriptionEventResponseMapperSpec extends Specification {
and: 'subscription name'
assert result.subscriptionName == "cm-subscription-001"
and: 'predicate targets '
- assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle2"]
+ assert result.predicates.targetCmHandles.cmHandleId == ["CMHandle1", "CMHandle3", "CMHandle4", "CMHandle5"]
and: 'the status for these targets is set to expected values'
- assert result.predicates.targetCmHandles.status == [SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED]
+ assert result.predicates.targetCmHandles.status == [SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED,
+ SubscriptionStatus.PENDING, SubscriptionStatus.PENDING]
and: 'the topic is null'
assert result.topic == null
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
index 243c31b39..cccd61b71 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -48,30 +49,32 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
given: 'an event with data category CM'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'notifications are enabled'
objectUnderTest.notificationFeatureEnabled = true
and: 'subscription model loader is enabled'
objectUnderTest.subscriptionModelLoaderEnabled = true
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'the event is mapped to a yangModelSubscription'
1 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent
and: 'the event is persisted'
1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent)
and: 'the event is forwarded'
- 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent)
+ 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
}
def 'Consume valid CM create message where notifications and model loader are disabled'() {
given: 'an event with data category CM'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'notifications are disabled'
objectUnderTest.notificationFeatureEnabled = false
and: 'subscription model loader is disabled'
objectUnderTest.subscriptionModelLoaderEnabled = false
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'the event is not mapped to a yangModelSubscription'
0 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(*_) >> yangModelSubscriptionEvent
and: 'the event is not persisted'
@@ -84,10 +87,11 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'dataCategory is set to FM'
testEventSent.getEvent().getDataType().setDataCategory("FM")
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'no exception is thrown'
noExceptionThrown()
and: 'the event is not mapped to a yangModelSubscription'
@@ -102,10 +106,11 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'datastore is set to a non passthrough datastore'
testEventSent.getEvent().getPredicates().setDatastore("operational")
when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(testEventSent)
+ objectUnderTest.consumeSubscriptionEvent(consumerRecord)
then: 'an operation not yet supported exception is thrown'
thrown(OperationNotYetSupportedException)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
index a3dec29ed..63ddcef55 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription
import com.fasterxml.jackson.databind.ObjectMapper
import com.hazelcast.map.IMap
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
@@ -35,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import spock.util.concurrent.BlockingVariable
+import java.util.concurrent.TimeUnit
+
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionEventForwarder])
class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
@@ -47,7 +50,8 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>)
@SpringBean
IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>)
-
+ @SpringBean
+ SubscriptionEventResponseOutcome mockSubscriptionEventResponseOutcome = Mock(SubscriptionEventResponseOutcome)
@Autowired
JsonObjectMapper jsonObjectMapper
@@ -55,6 +59,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the InventoryPersistence returns private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> [
createYangModelCmHandleWithDmiProperty(1, 1,"shape","circle"),
@@ -66,44 +71,46 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'An asynchronous call is made to the blocking variable'
block.get()
then: 'the event is added to the forwarded subscription event cache'
- 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
+ 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set, 600, TimeUnit.SECONDS)
and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1",
- subscriptionEvent -> {
+ consumerRecord.headers(), subscriptionEvent -> {
Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
targets["CMHandle1"] == ["shape":"circle"]
targets["CMHandle2"] == ["shape":"square"]
}
)
1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2",
- subscriptionEvent -> {
+ consumerRecord.headers(), subscriptionEvent -> {
Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0)
targets["CMHandle3"] == ["shape":"triangle"]
}
)
and: 'a separate thread has been created where the map is polled'
1 * mockForwardedSubscriptionEventCache.containsKey("SCO-9989752cm-subscription-001") >> true
- 1 * mockForwardedSubscriptionEventCache.get(_) >> (DMINamesInMap)
+ 1 * mockForwardedSubscriptionEventCache.get(_) >> DMINamesInMap
+ 1 * mockSubscriptionEventResponseOutcome.sendResponse(*_)
and: 'the subscription id is removed from the event cache map returning the asynchronous blocking variable'
1 * mockForwardedSubscriptionEventCache.remove("SCO-9989752cm-subscription-001") >> {block.set(_)}
where:
scenario | DMINamesInMap
'there are dmis which have not responded' | ["DMIName1", "DMIName2"] as Set
- 'all dmis have responded ' | [] as Set
+ 'all dmis have responded' | [] as Set
}
def 'Forward CM create subscription where target CM Handles are #scenario'() {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the target CMHandles are set to #scenario'
testEventSent.getEvent().getPredicates().setTargets(invalidTargets)
when: 'the event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'an operation not yet supported exception is thrown'
thrown(OperationNotYetSupportedException)
where:
@@ -117,6 +124,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
given: 'an event'
def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
+ def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent)
and: 'the InventoryPersistence returns no private properties for the supplied CM Handles'
1 * mockInventoryPersistence.getYangModelCmHandles(["CMHandle1", "CMHandle2", "CMHandle3"]) >> []
and: 'the thread creation delay is reduced to 2 seconds for testing'
@@ -124,7 +132,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec {
and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds'
def block = new BlockingVariable<Object>(5)
when: 'the valid event is forwarded'
- objectUnderTest.forwardCreateSubscriptionEvent(testEventSent)
+ objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers())
then: 'the event is not added to the forwarded subscription event cache'
0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set)
and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy
new file mode 100644
index 000000000..53c5cd2c7
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcomeSpec.groovy
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.avcsubscription
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence
+import org.onap.cps.ncmp.api.impl.utils.DataNodeBaseSpec
+import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.testcontainers.shaded.org.bouncycastle.crypto.engines.EthereumIESEngine
+
+@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, SubscriptionOutcomeMapper, SubscriptionEventResponseOutcome])
+class SubscriptionEventResponseOutcomeSpec extends DataNodeBaseSpec {
+
+ @Autowired
+ SubscriptionEventResponseOutcome objectUnderTest
+
+ @SpringBean
+ SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence)
+ @SpringBean
+ EventsPublisher<SubscriptionEventOutcome> mockSubscriptionEventOutcomePublisher = Mock(EventsPublisher<SubscriptionEventOutcome>)
+ @SpringBean
+ SubscriptionOutcomeMapper subscriptionOutcomeMapper = Mappers.getMapper(SubscriptionOutcomeMapper)
+
+ @Autowired
+ JsonObjectMapper jsonObjectMapper
+
+ def 'Generate response via fetching data nodes from database.'() {
+ given: 'a db call to get data nodes for subscription event'
+ 1 * mockSubscriptionPersistence.getDataNodesForSubscriptionEvent() >> [dataNode4]
+ when: 'a response is generated'
+ def result = objectUnderTest.generateResponse('some-client-id', 'some-subscription-name', isFullOutcomeResponse)
+ then: 'the result will have the same values as same as in dataNode4'
+ result.eventType == eventType
+ result.getEvent().getSubscription().getClientID() == 'some-client-id'
+ result.getEvent().getSubscription().getName() == 'some-subscription-name'
+ result.getEvent().getPredicates().getPendingTargets() == ['CMHandle3']
+ result.getEvent().getPredicates().getRejectedTargets() == ['CMHandle1']
+ result.getEvent().getPredicates().getAcceptedTargets() == ['CMHandle2']
+ where: 'the following values are used'
+ scenario | isFullOutcomeResponse || eventType
+ 'is full outcome' | true || SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME
+ 'is partial outcome' | false || SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME
+ }
+
+ def 'Form subscription outcome message with a list of cm handle id to status mapping'() {
+ given: 'a list of collection including cm handle id to status'
+ def cmHandleIdToStatus = [['PENDING', 'CMHandle5'], ['PENDING', 'CMHandle4'], ['ACCEPTED', 'CMHandle1'], ['REJECTED', 'CMHandle3']]
+ and: 'an outcome event'
+ def jsonData = TestUtils.getResourceFileContent('avcSubscriptionOutcomeEvent.json')
+ def eventOutcome = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEventOutcome.class)
+ eventOutcome.setEventType(eventType)
+ when: 'a subscription outcome message formed'
+ def result = objectUnderTest.formSubscriptionOutcomeMessage(cmHandleIdToStatus, 'SCO-9989752',
+ 'cm-subscription-001', isFullOutcomeResponse)
+ result.getEvent().getPredicates().getPendingTargets().sort()
+ then: 'the result will be equal to event outcome'
+ result == eventOutcome
+ where: 'the following values are used'
+ scenario | isFullOutcomeResponse | eventType
+ 'is full outcome' | true | SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME
+ 'is partial outcome' | false | SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
index 2d3f8ac51..edc6e3bcf 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
@@ -35,7 +35,7 @@ class LcmEventsServiceSpec extends Specification {
def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper)
def 'Create and Publish lcm event where events are #scenario'() {
- given: 'a cm handle id and Lcm Event'
+ given: 'a cm handle id, Lcm Event, and headers'
def cmHandleId = 'test-cm-handle-id'
def eventId = UUID.randomUUID().toString()
def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
index 75760091d..a372abe6f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
@@ -36,23 +36,21 @@ class SubscriptionPersistenceSpec extends Specification {
private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry";
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
-
def mockCpsDataService = Mock(CpsDataService)
-
def objectUnderTest = new SubscriptionPersistenceImpl(jsonObjectMapper, mockCpsDataService)
+ def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore',
+ targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'),
+ new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')])
+ def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id',
+ subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates)
+
def 'save a subscription event' () {
- given: 'a yang model subscription event'
- def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore',
- targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'),
- new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')])
- def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id',
- subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates)
- and: 'a data node that does not exist in db'
- def dataNodeNonExist = new DataNodeBuilder().withDataspace('NCMP-Admin')
+ given: 'a data node that does not exist in db'
+ def blankDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry').build()
and: 'cps data service return non existing data node'
- mockCpsDataService.getDataNodes(*_) >> [dataNodeNonExist]
+ mockCpsDataService.getDataNodes(*_) >> [blankDataNode]
when: 'the yangModelSubscriptionEvent is saved into db'
objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
then: 'the cpsDataService save operation is called with the correct data'
@@ -66,20 +64,14 @@ class SubscriptionPersistenceSpec extends Specification {
}
def 'update a subscription event' () {
- given: 'a yang model subscription event'
- def predicates = new YangModelSubscriptionEvent.Predicates(datastore: 'some-datastore',
- targetCmHandles: [new YangModelSubscriptionEvent.TargetCmHandle('cmhandle1'),
- new YangModelSubscriptionEvent.TargetCmHandle('cmhandle2')])
- def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id',
- subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates)
- and: 'a data node exist in db'
+ given: 'a data node exist in db'
def childDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription').build()
- def dataNodeExist = new DataNodeBuilder().withDataspace('NCMP-Admin')
+ def engagedDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry')
.withChildDataNodes([childDataNode]).build()
and: 'cps data service return existing data node'
- mockCpsDataService.getDataNodes(*_) >> [dataNodeExist]
+ mockCpsDataService.getDataNodes(*_) >> [engagedDataNode]
when: 'the yangModelSubscriptionEvent is saved into db'
objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
then: 'the cpsDataService update operation is called with the correct data'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy
new file mode 100644
index 000000000..7474166ff
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeBaseSpec.groovy
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils
+
+import org.onap.cps.spi.model.DataNodeBuilder
+import spock.lang.Specification
+
+class DataNodeBaseSpec extends Specification {
+
+ def leaves1 = [status:'PENDING', cmHandleId:'CMHandle3'] as Map
+ def dataNode1 = createDataNodeWithLeaves(leaves1)
+
+ def leaves2 = [status:'ACCEPTED', cmHandleId:'CMHandle2'] as Map
+ def dataNode2 = createDataNodeWithLeaves(leaves2)
+
+ def leaves3 = [status:'REJECTED', cmHandleId:'CMHandle1'] as Map
+ def dataNode3 = createDataNodeWithLeaves(leaves3)
+
+ def leaves4 = [datastore:'passthrough-running'] as Map
+ def dataNode4 = createDataNodeWithLeavesAndChildDataNodes(leaves4, [dataNode1, dataNode2, dataNode3])
+
+ static def createDataNodeWithLeaves(leaves) {
+ return new DataNodeBuilder().withDataspace('NCMP-Admin')
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves(leaves).build()
+ }
+
+ static def createDataNodeWithLeavesAndChildDataNodes(leaves, dataNodes) {
+ return new DataNodeBuilder().withDataspace('NCMP-Admin')
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves(leaves).withChildDataNodes(dataNodes)
+ .build()
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
new file mode 100644
index 000000000..e527ae12b
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils
+
+import org.onap.cps.spi.model.DataNodeBuilder
+
+class DataNodeHelperSpec extends DataNodeBaseSpec {
+
+ def 'Get data node leaves as expected from a nested data node.'() {
+ given: 'a nested data node'
+ def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves([clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001'])
+ .withChildDataNodes([dataNode4]).build()
+ when: 'the nested data node is flatten and retrieves the leaves '
+ def result = DataNodeHelper.getDataNodeLeaves([dataNode])
+ then: 'the result list size is 5'
+ result.size() == 5
+ and: 'all the leaves result list are equal to given leaves of data nodes'
+ result[0] == [clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001']
+ result[1] == [datastore:'passthrough-running']
+ result[2] == [status:'PENDING', cmHandleId:'CMHandle3']
+ result[3] == [status:'ACCEPTED', cmHandleId:'CMHandle2']
+ result[4] == [status:'REJECTED', cmHandleId:'CMHandle1']
+ }
+
+ def 'Get cm handle id to status as expected from a nested data node.'() {
+ given: 'a nested data node'
+ def dataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves([clientID:'SCO-9989752', isTagged:false, subscriptionName:'cm-subscription-001'])
+ .withChildDataNodes([dataNode4]).build()
+ and: 'the nested data node is flatten and retrieves the leaves '
+ def leaves = DataNodeHelper.getDataNodeLeaves([dataNode])
+ when:'cm handle id to status is retrieved'
+ def result = DataNodeHelper.getCmHandleIdToStatus(leaves);
+ then: 'the result list size is 3'
+ result.size() == 3
+ }
+}
diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
index b054362c9..3244f05a0 100644
--- a/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
+++ b/cps-ncmp-service/src/test/resources/avcSubscriptionEventResponse.json
@@ -4,6 +4,8 @@
"dmiName": "ncmp-dmi-plugin",
"cmHandleIdToStatus": {
"CMHandle1": "ACCEPTED",
- "CMHandle2": "REJECTED"
+ "CMHandle3": "REJECTED",
+ "CMHandle4": "PENDING",
+ "CMHandle5": "PENDING"
}
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json
new file mode 100644
index 000000000..6bfa36bf7
--- /dev/null
+++ b/cps-ncmp-service/src/test/resources/avcSubscriptionOutcomeEvent.json
@@ -0,0 +1,21 @@
+{
+ "eventType": "PARTIAL_OUTCOME",
+ "event": {
+ "subscription": {
+ "clientID": "SCO-9989752",
+ "name": "cm-subscription-001"
+ },
+ "predicates": {
+ "rejectedTargets": [
+ "CMHandle3"
+ ],
+ "acceptedTargets": [
+ "CMHandle1"
+ ],
+ "pendingTargets": [
+ "CMHandle4",
+ "CMHandle5"
+ ]
+ }
+ }
+} \ No newline at end of file
diff --git a/docs/cps-path.rst b/docs/cps-path.rst
index 08892e09e..bb482c2ed 100644
--- a/docs/cps-path.rst
+++ b/docs/cps-path.rst
@@ -223,7 +223,7 @@ descendant-path
leaf-conditions
---------------
-**Syntax**: ``<xpath> '[' @<leaf-name1> '=' <leaf-value1> ( ' <and|or> ' @<leaf-name> '=' <leaf-value> )* ']'``
+**Syntax**: ``<xpath> '[' @<leaf-name1> '(=|>|<|>=|<=)' <leaf-value1> ( ' <and|or> ' @<leaf-name> '(=|>|<|>=|<=)' <leaf-value> )* ']'``
- ``xpath``: Absolute or descendant or xpath to the (list) node which elements will be queried.
- ``leaf-name``: The name of the leaf which value needs to be compared.
- ``leaf-value``: The required value of the leaf.
@@ -234,6 +234,8 @@ leaf-conditions
- ``//categories[@name='Kids']``
- ``//categories[@code='1']/books/book[@title='Dune' and @price=5]``
- ``//categories[@code='1']/books/book[@title='xyz' or @price=15]``
+ - ``//categories[@code='1']/books/book[@title='xyz' or @price>20]``
+ - ``//categories[@code='1']/books/book[@title='Dune' and @price<=5]``
- ``//categories[@code=1]``
**Limitations**
- Only the last list or container can be queried leaf values. Any ancestor list will have to be referenced by its key name-value pair(s).
@@ -242,6 +244,7 @@ leaf-conditions
- Leaf names are not validated so ``or`` operations with invalid leaf names will silently be ignored.
- Only leaves can be used, leaf-list are not supported.
- Only string and integer values are supported, boolean and float values are not supported.
+ - Using comparative operators with string values will lead to an error at runtime. This error can't be validated earlier as the datatype is unknown until the execution phase.
- The key should be supplied with correct data type for it to be queried from DB. In the last example above the attribute code is of type
Integer so the cps query will not work if the value is passed as string.
eg: ``//categories[@code="1"]`` or ``//categories[@code='1']`` will not work because the key attribute code is treated a string.
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/CpsQueryServiceIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/CpsQueryServiceIntegrationSpec.groovy
index 233c58fb6..fa0b82045 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/CpsQueryServiceIntegrationSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/CpsQueryServiceIntegrationSpec.groovy
@@ -251,7 +251,7 @@ class CpsQueryServiceIntegrationSpec extends FunctionalSpecBase {
'all' | INCLUDE_ALL_DESCENDANTS || 17
}
- def 'Cps Path query with syntax error throws a CPS Path Exception.'() {
+ def 'Cps Path query with #scenario throws a CPS Path Exception.'() {
when: 'trying to execute a query with a syntax (parsing) error'
objectUnderTest.queryDataNodes(FUNCTIONAL_TEST_DATASPACE_1, BOOKSTORE_ANCHOR_1, cpsPath, OMIT_DESCENDANTS)
then: 'a cps path exception is thrown'