summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/pom.xml22
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java127
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java13
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java25
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java7
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java139
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java30
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java21
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy10
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy62
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy17
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy29
-rw-r--r--cps-ncmp-service/src/test/resources/application.yml8
17 files changed, 481 insertions, 91 deletions
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml
index b87fe64366..19ef988d30 100644
--- a/cps-ncmp-service/pom.xml
+++ b/cps-ncmp-service/pom.xml
@@ -27,7 +27,7 @@
<parent>
<groupId>org.onap.cps</groupId>
<artifactId>cps-parent</artifactId>
- <version>3.3.2-SNAPSHOT</version>
+ <version>3.3.3-SNAPSHOT</version>
<relativePath>../cps-parent/pom.xml</relativePath>
</parent>
@@ -42,6 +42,18 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-json-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-spring</artifactId>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>cps-service</artifactId>
</dependency>
@@ -54,8 +66,8 @@
<artifactId>cps-path-parser</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-web</artifactId>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast-spring</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
@@ -66,8 +78,8 @@
<artifactId>mapstruct-processor</artifactId>
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-spring</artifactId>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
</dependency>
<!-- T E S T - D E P E N D E N C I E S -->
<dependency>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
new file mode 100644
index 0000000000..b76f86ebeb
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.kafka;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+/**
+ * kafka Configuration for legacy and cloud events.
+ *
+ * @param <T> valid legacy event to be published over the wire.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class KafkaTemplateConfig<T> {
+
+ private final KafkaProperties kafkaProperties;
+
+ /**
+ * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
+ * application.yml and replaces value-serializer by JsonSerializer.
+ *
+ * @return legacy event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, T> legacyEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
+ * into application.yml and replaces deserializer-value by JsonDeserializer.
+ *
+ * @return an instance of legacy consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+ * application.yml with CloudEventSerializer.
+ *
+ * @return cloud event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+ * into application.yml having CloudEventDeserializer as deserializer-value.
+ *
+ * @return an instance of cloud consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+ *
+ * @return an instance of legacy Kafka template.
+ */
+ @Bean
+ @Primary
+ public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+ /**
+ * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+ *
+ * @return an instance of cloud Kafka template.
+ */
+ @Bean
+ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index d92316dc58..7b28b4cd5f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -20,6 +20,7 @@
package org.onap.cps.ncmp.api.impl.events;
+import io.cloudevents.CloudEvent;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -42,7 +43,12 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
@RequiredArgsConstructor
public class EventsPublisher<T> {
- private final KafkaTemplate<String, T> eventKafkaTemplate;
+ /** Once all cps events will be modified to cloud compliant, will remove legacyKafkaEventTemplate with
+ it's java configuration file KafkaTemplateConfig. **/
+ @Deprecated(forRemoval = true)
+ private final KafkaTemplate<String, T> legacyKafkaEventTemplate;
+
+ private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
/**
* Generic Event publisher.
@@ -54,7 +60,8 @@ public class EventsPublisher<T> {
*/
@Deprecated
public void publishEvent(final String topicName, final String eventKey, final T event) {
- final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event);
+ final ListenableFuture<SendResult<String, T>> eventFuture
+ = legacyKafkaEventTemplate.send(topicName, eventKey, event);
eventFuture.addCallback(handleCallback(topicName));
}
@@ -70,7 +77,7 @@ public class EventsPublisher<T> {
final ProducerRecord<String, T> producerRecord =
new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
- final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(producerRecord);
+ final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord);
eventFuture.addCallback(handleCallback(topicName));
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
index a74682571b..1bfc4ab28b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
@@ -22,8 +22,6 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
@@ -104,29 +102,12 @@ public class SubscriptionEventResponseOutcome {
private SubscriptionEventResponse toSubscriptionEventResponse(
final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
final String subscriptionName) {
- final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = new HashMap<>();
+ final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
+ DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+
final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
subscriptionEventResponse.setClientId(subscriptionClientId);
subscriptionEventResponse.setSubscriptionName(subscriptionName);
-
- for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
- final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
- while (bucketIterator.hasNext()) {
- final String item = (String) bucketIterator.next();
- if ("PENDING".equals(item)) {
- cmHandleIdToStatusMap.put((String) bucketIterator.next(),
- SubscriptionStatus.PENDING);
- }
- if ("REJECTED".equals(item)) {
- cmHandleIdToStatusMap.put((String) bucketIterator.next(),
- SubscriptionStatus.REJECTED);
- }
- if ("ACCEPTED".equals(item)) {
- cmHandleIdToStatusMap.put((String) bucketIterator.next(),
- SubscriptionStatus.ACCEPTED);
- }
- }
- }
subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap);
return subscriptionEventResponse;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
index f240c4510d..27d4266566 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java
@@ -39,4 +39,11 @@ public interface SubscriptionPersistence {
* @return the DataNode as collection.
*/
Collection<DataNode> getDataNodesForSubscriptionEvent();
+
+ /**
+ * Get data nodes by xpath.
+ *
+ * @return the DataNode as collection.
+ */
+ Collection<DataNode> getCmHandlesForSubscriptionEvent(String clientId, String subscriptionName);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
index 9a063d6dfd..d2b1237a4d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java
@@ -22,11 +22,18 @@ package org.onap.cps.ncmp.api.impl.subscriptions;
import static org.onap.cps.ncmp.api.impl.constants.DmiRegistryConstants.NO_TIMESTAMP;
+import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsDataService;
+import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.model.DataNode;
@@ -41,35 +48,86 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
private static final String SUBSCRIPTION_DATASPACE_NAME = "NCMP-Admin";
private static final String SUBSCRIPTION_ANCHOR_NAME = "AVC-Subscriptions";
private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry";
-
private final JsonObjectMapper jsonObjectMapper;
private final CpsDataService cpsDataService;
@Override
public void saveSubscriptionEvent(final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
- final String subscriptionEventJsonData =
- createSubscriptionEventJsonData(jsonObjectMapper.asJsonString(yangModelSubscriptionEvent));
+ final String clientId = yangModelSubscriptionEvent.getClientId();
+ final String subscriptionName = yangModelSubscriptionEvent.getSubscriptionName();
+
final Collection<DataNode> dataNodes = cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+
+ if (isSubscriptionRegistryEmptyOrNonExist(dataNodes, clientId, subscriptionName)) {
+ saveSubscriptionEventYangModel(createSubscriptionEventJsonData(
+ jsonObjectMapper.asJsonString(yangModelSubscriptionEvent)));
+ } else {
+ findDeltaCmHandlesAddOrUpdateInDatabase(yangModelSubscriptionEvent, clientId, subscriptionName, dataNodes);
+ }
+ }
+
+ private void findDeltaCmHandlesAddOrUpdateInDatabase(final YangModelSubscriptionEvent yangModelSubscriptionEvent,
+ final String clientId, final String subscriptionName,
+ final Collection<DataNode> dataNodes) {
+ final Map<String, SubscriptionStatus> cmHandleIdsFromYangModel =
+ extractCmHandleFromYangModelAsMap(yangModelSubscriptionEvent);
+ final Map<String, SubscriptionStatus> cmHandleIdsFromDatabase =
+ extractCmHandleFromDbAsMap(dataNodes);
+
+ final Map<String, SubscriptionStatus> newCmHandles =
+ mapDifference(cmHandleIdsFromYangModel, cmHandleIdsFromDatabase);
+ traverseCmHandleList(newCmHandles, clientId, subscriptionName, true);
+
+ final Map<String, SubscriptionStatus> existingCmHandles =
+ mapDifference(cmHandleIdsFromYangModel, newCmHandles);
+ traverseCmHandleList(existingCmHandles, clientId, subscriptionName, false);
+ }
+
+ private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes,
+ final String clientId, final String subscriptionName) {
final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst();
- final boolean isCreateOperation =
- dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty();
- saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, isCreateOperation);
+ return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty())
+ || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty());
+ }
+
+ private void traverseCmHandleList(final Map<String, SubscriptionStatus> cmHandleMap,
+ final String clientId,
+ final String subscriptionName,
+ final boolean isAddListElementOperation) {
+ final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList =
+ targetCmHandlesAsList(cmHandleMap);
+ for (final YangModelSubscriptionEvent.TargetCmHandle targetCmHandle : cmHandleList) {
+ final String targetCmHandleAsJson =
+ createTargetCmHandleJsonData(jsonObjectMapper.asJsonString(targetCmHandle));
+ addOrReplaceCmHandlePredicateListElement(targetCmHandleAsJson, clientId, subscriptionName,
+ isAddListElementOperation);
+ }
}
- private void saveOrUpdateSubscriptionEventYangModel(final String subscriptionEventJsonData,
- final boolean isCreateOperation) {
- if (isCreateOperation) {
- log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData);
- cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+ private void addOrReplaceCmHandlePredicateListElement(final String targetCmHandleAsJson,
+ final String clientId,
+ final String subscriptionName,
+ final boolean isAddListElementOperation) {
+ if (isAddListElementOperation) {
+ log.info("targetCmHandleAsJson to be added into DB {}", targetCmHandleAsJson);
+ cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME,
+ SUBSCRIPTION_ANCHOR_NAME, createCmHandleXpathPredicates(clientId, subscriptionName),
+ targetCmHandleAsJson, NO_TIMESTAMP);
} else {
- log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData);
- cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+ log.info("targetCmHandleAsJson to be updated into DB {}", targetCmHandleAsJson);
+ cpsDataService.updateNodeLeaves(SUBSCRIPTION_DATASPACE_NAME,
+ SUBSCRIPTION_ANCHOR_NAME, createCmHandleXpathPredicates(clientId, subscriptionName),
+ targetCmHandleAsJson, NO_TIMESTAMP);
}
}
+ private void saveSubscriptionEventYangModel(final String subscriptionEventJsonData) {
+ log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData);
+ cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP);
+ }
+
@Override
public Collection<DataNode> getDataNodesForSubscriptionEvent() {
return cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME,
@@ -77,7 +135,58 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence {
FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
}
+ @Override
+ public Collection<DataNode> getCmHandlesForSubscriptionEvent(final String clientId, final String subscriptionName) {
+ return cpsDataService.getDataNodesForMultipleXpaths(SUBSCRIPTION_DATASPACE_NAME,
+ SUBSCRIPTION_ANCHOR_NAME, Arrays.asList(createCmHandleXpath(clientId, subscriptionName)),
+ FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+ }
+
+ private static Map<String, SubscriptionStatus> extractCmHandleFromDbAsMap(final Collection<DataNode> dataNodes) {
+ final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
+ final List<Collection<Serializable>> cmHandleIdToStatus = DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
+ return DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
+ }
+
+ private static Map<String, SubscriptionStatus> extractCmHandleFromYangModelAsMap(
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
+ return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles()
+ .stream().collect(Collectors.toMap(
+ YangModelSubscriptionEvent.TargetCmHandle::getCmHandleId,
+ YangModelSubscriptionEvent.TargetCmHandle::getStatus));
+ }
+
+ private static List<YangModelSubscriptionEvent.TargetCmHandle> targetCmHandlesAsList(
+ final Map<String, SubscriptionStatus> newCmHandles) {
+ return newCmHandles.entrySet().stream().map(entry ->
+ new YangModelSubscriptionEvent.TargetCmHandle(entry.getKey(),
+ entry.getValue())).collect(Collectors.toList());
+ }
+
private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) {
return "{\"subscription\":[" + yangModelSubscriptionAsJson + "]}";
}
+
+ private static String createTargetCmHandleJsonData(final String targetCmHandleAsJson) {
+ return "{\"targetCmHandles\":[" + targetCmHandleAsJson + "]}";
+ }
+
+ private static String createCmHandleXpathPredicates(final String clientId, final String subscriptionName) {
+ return "/subscription-registry/subscription[@clientID='" + clientId
+ + "' and @subscriptionName='" + subscriptionName + "']/predicates";
+ }
+
+ private static String createCmHandleXpath(final String clientId, final String subscriptionName) {
+ return "/subscription-registry/subscription[@clientID='" + clientId
+ + "' and @subscriptionName='" + subscriptionName + "']";
+ }
+
+ private static <K, V> Map<K, V> mapDifference(final Map<? extends K, ? extends V> left,
+ final Map<? extends K, ? extends V> right) {
+ final Map<K, V> difference = new HashMap<>();
+ difference.putAll(left);
+ difference.putAll(right);
+ difference.entrySet().removeAll(right.entrySet());
+ return difference;
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
index 0b4f91fac3..ce3b88ba03 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java
@@ -20,8 +20,36 @@
package org.onap.cps.ncmp.api.impl.subscriptions;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+
public enum SubscriptionStatus {
ACCEPTED,
REJECTED,
- PENDING
+ PENDING;
+
+
+ /**
+ * Populates a map with a key of cm handle id and a value of subscription status.
+ *
+ * @param resultMap the map is being populated
+ * @param bucketIterator to iterate over the collection
+ */
+ public static void populateCmHandleToSubscriptionStatusMap(final Map<String, SubscriptionStatus> resultMap,
+ final Iterator<Serializable> bucketIterator) {
+ final String item = (String) bucketIterator.next();
+ if ("PENDING".equals(item)) {
+ resultMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.PENDING);
+ }
+ if ("REJECTED".equals(item)) {
+ resultMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.REJECTED);
+ }
+ if ("ACCEPTED".equals(item)) {
+ resultMap.put((String) bucketIterator.next(),
+ SubscriptionStatus.ACCEPTED);
+ }
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
index 1648ac4fbb..8d44592ae2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
@@ -22,12 +22,15 @@ package org.onap.cps.ncmp.api.impl.utils;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.spi.model.DataNode;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -72,4 +75,22 @@ public class DataNodeHelper {
|| col.contains("REJECTED"))
.collect(Collectors.toList());
}
+
+ /**
+ * The cm handle and status is returned as a map.
+ *
+ * @param cmHandleIdToStatus as a list of collection
+ * @return a map of cm handle id to status
+ */
+ public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMap(
+ final List<Collection<Serializable>> cmHandleIdToStatus) {
+ final Map<String, SubscriptionStatus> resultMap = new HashMap<>();
+ for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) {
+ final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator();
+ while (bucketIterator.hasNext()) {
+ SubscriptionStatus.populateCmHandleToSubscriptionStatusMap(resultMap, bucketIterator);
+ }
+ }
+ return resultMap;
+ }
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
index bcf75a29b2..fe7b3f11cb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -34,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
-
import java.time.Duration
@SpringBootTest(classes = [EventsPublisher, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
@SpringBean
EventsPublisher cpsAsyncRequestResponseEventPublisher =
- new EventsPublisher<NcmpAsyncRequestResponseEvent>(kafkaTemplate);
+ new EventsPublisher<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
@SpringBean
@@ -59,18 +59,18 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
@Autowired
JsonObjectMapper jsonObjectMapper
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
def 'Consume and forward valid message'() {
given: 'consumer has a subscription'
- kafkaConsumer.subscribe(['test-topic'] as List<String>)
+ legacyEventKafkaConsumer.subscribe(['test-topic'] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class)
when: 'the event is consumed'
ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent)
and: 'the topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'consumed forwarded event id is the same as sent event id'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
index 28464bb91c..02071cd8cf 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.SerializationUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
@@ -46,7 +47,7 @@ import java.time.Duration
class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(kafkaTemplate)
+ EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
@@ -57,19 +58,19 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
@Autowired
RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
def static clientTopic = 'client-topic'
def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
- kafkaConsumer.subscribe([clientTopic])
+ legacyEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for batch event'
def consumerRecordIn = createConsumerRecord(batchEventType)
when: 'the batch event is consumed and published to client specified topic'
asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
- def consumerRecordOut = kafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
then: 'verifying consumed event operationID is same as published event operationID'
def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
new file mode 100644
index 0000000000..ed5f161258
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.kafka;
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaTemplateConfigSpec extends Specification {
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+ def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+ expect: 'kafka template is instantiated'
+ assert kafkaTemplateInstance.properties['beanName'] == beanName
+ and: 'verify event key and value serializer'
+ assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+ and: 'verify event key and value deserializer'
+ assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+ where: 'the following event type is used'
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 5f54bbe3dd..3dffac714b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -48,7 +49,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
@SpringBean
- EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(kafkaTemplate)
+ EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
@@ -56,13 +57,13 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
@Autowired
JsonObjectMapper jsonObjectMapper
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def 'Consume and forward valid message'() {
given: 'consumer has a subscription on a topic'
def cmEventsTopicName = 'cm-events'
acvEventConsumer.cmEventsTopicName = cmEventsTopicName
- kafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+ legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
and: 'an event is sent'
def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
@@ -73,7 +74,7 @@ class AvcEventConsumerSpec extends MessagingBaseSpec {
when: 'the event is consumed'
acvEventConsumer.consumeAndForward(consumerRecord)
and: 'the topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record can be converted to AVC event'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
index 93741261f6..4c6880421b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.lcm
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.lcm.v1.Event
@@ -42,12 +43,12 @@ import java.time.Duration
@DirtiesContext
class LcmEventsPublisherSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(kafkaTemplate)
+ EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
@@ -82,11 +83,11 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
eventSchema : eventSchema,
eventSchemaVersion: eventSchemaVersion]
and: 'consumer has a subscription'
- kafkaConsumer.subscribe([testTopic] as List<String>)
+ legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is published'
lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
- def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+ def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record key matches the expected event key'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
index a372abe6ff..ec54e8917a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceSpec.groovy
@@ -34,6 +34,7 @@ class SubscriptionPersistenceSpec extends Specification {
private static final String SUBSCRIPTION_DATASPACE_NAME = "NCMP-Admin";
private static final String SUBSCRIPTION_ANCHOR_NAME = "AVC-Subscriptions";
private static final String SUBSCRIPTION_REGISTRY_PARENT = "/subscription-registry";
+ private static final String SUBSCRIPTION_REGISTRY_PREDICATES_XPATH = "/subscription-registry/subscription[@clientID='some-client-id' and @subscriptionName='some-subscription-name']/predicates";
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockCpsDataService = Mock(CpsDataService)
@@ -45,11 +46,11 @@ class SubscriptionPersistenceSpec extends Specification {
def yangModelSubscriptionEvent = new YangModelSubscriptionEvent(clientId: 'some-client-id',
subscriptionName: 'some-subscription-name', tagged: true, topic: 'some-topic', predicates: predicates)
- def 'save a subscription event' () {
- given: 'a data node that does not exist in db'
+ def 'save a subscription event as yang model into db for the #scenarios' () {
+ given: 'a blank data node that exist in db'
def blankDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry').build()
- and: 'cps data service return non existing data node'
+ and: 'cps data service return an empty data node'
mockCpsDataService.getDataNodes(*_) >> [blankDataNode]
when: 'the yangModelSubscriptionEvent is saved into db'
objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
@@ -63,24 +64,28 @@ class SubscriptionPersistenceSpec extends Specification {
NO_TIMESTAMP)
}
- def 'update a subscription event' () {
- given: 'a data node exist in db'
+ def 'add or replace cm handle list element into db' () {
+ given: 'a data node with child node exist in db'
+ def leaves1 = [status:'PENDING', cmHandleId:'cmhandle1'] as Map
def childDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
- .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription').build()
+ .withAnchor('AVC-Subscriptions').withXpath('/subscription-registry/subscription')
+ .withLeaves(leaves1).build()
def engagedDataNode = new DataNodeBuilder().withDataspace('NCMP-Admin')
.withAnchor('AVC-Subscriptions').withXpath('/subscription-registry')
.withChildDataNodes([childDataNode]).build()
- and: 'cps data service return existing data node'
+ and: 'cps data service return data node including a child data node'
mockCpsDataService.getDataNodes(*_) >> [engagedDataNode]
- when: 'the yangModelSubscriptionEvent is saved into db'
+ and: 'cps data service return data node for querying by xpaths'
+ mockCpsDataService.getDataNodesForMultipleXpaths(*_) >> [engagedDataNode]
+ when: 'the yang model subscription event is saved into db'
objectUnderTest.saveSubscriptionEvent(yangModelSubscriptionEvent)
- then: 'the cpsDataService update operation is called with the correct data'
- 1 * mockCpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- SUBSCRIPTION_REGISTRY_PARENT,
- '{"subscription":[{' +
- '"topic":"some-topic",' +
- '"predicates":{"datastore":"some-datastore","targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"},{"cmHandleId":"cmhandle2","status":"PENDING"}]},' +
- '"clientID":"some-client-id","subscriptionName":"some-subscription-name","isTagged":true}]}',
+ then: 'the cpsDataService save non-existing cm handle with the correct data'
+ 1 * mockCpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle2","status":"PENDING"}]}',
+ NO_TIMESTAMP)
+ and: 'the cpsDataService update existing cm handle with the correct data'
+ 1 * mockCpsDataService.updateNodeLeaves(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ SUBSCRIPTION_REGISTRY_PREDICATES_XPATH, '{"targetCmHandles":[{"cmHandleId":"cmhandle1","status":"PENDING"}]}',
NO_TIMESTAMP)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
index e527ae12bb..ee726a908e 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/DataNodeHelperSpec.groovy
@@ -20,6 +20,7 @@
package org.onap.cps.ncmp.api.impl.utils
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus
import org.onap.cps.spi.model.DataNodeBuilder
class DataNodeHelperSpec extends DataNodeBaseSpec {
@@ -51,8 +52,22 @@ class DataNodeHelperSpec extends DataNodeBaseSpec {
and: 'the nested data node is flatten and retrieves the leaves '
def leaves = DataNodeHelper.getDataNodeLeaves([dataNode])
when:'cm handle id to status is retrieved'
- def result = DataNodeHelper.getCmHandleIdToStatus(leaves);
+ def result = DataNodeHelper.getCmHandleIdToStatus(leaves)
then: 'the result list size is 3'
result.size() == 3
+ and: 'the result contains expected values'
+ result[0] as List == ['PENDING', 'CMHandle3']
+ result[1] as List == ['ACCEPTED', 'CMHandle2']
+ result[2] as List == ['REJECTED', 'CMHandle1']
+ }
+
+ def 'Get cm handle id to status map as expected from list of collection' () {
+ given: 'a list of collection'
+ def cmHandleCollection = [['PENDING', 'CMHandle3'], ['ACCEPTED', 'CMHandle2'], ['REJECTED', 'CMHandle1']]
+ when: 'the map is formed up with a method call'
+ def result = DataNodeHelper.getCmHandleIdToStatusMap(cmHandleCollection)
+ then: 'the map values are as expected'
+ result.keySet() == ['CMHandle3', 'CMHandle2', 'CMHandle1'] as Set
+ result.values() as List == [SubscriptionStatus.PENDING, SubscriptionStatus.ACCEPTED, SubscriptionStatus.REJECTED]
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
index 337178e128..603b8cdda6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
@@ -20,6 +20,8 @@
package org.onap.cps.ncmp.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.spockframework.spring.SpringBean
@@ -44,30 +46,33 @@ class MessagingBaseSpec extends Specification {
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ @SpringBean
+ KafkaTemplate legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(eventProducerConfigProperties(JsonSerializer)))
+
+ @SpringBean
+ KafkaTemplate cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+ dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+ }
+
+ def eventProducerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueSerializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
-
- @SpringBean
- KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
- @DynamicPropertySource
- static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
- dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
- }
}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 1016f2b033..197bfda19c 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -16,6 +16,14 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=========================================================
+spring:
+ kafka:
+ producer:
+ value-serializer: io.cloudevents.kafka.CloudEventSerializer
+ consumer:
+ properties:
+ spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
+
app:
ncmp:
avc: