summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java46
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java16
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java21
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java33
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java30
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java30
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java34
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy79
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy6
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy35
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy33
16 files changed, 479 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
new file mode 100644
index 0000000000..6122afc808
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api;
+
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+public interface DataJobService {
+
+ /**
+ * process read data job operations.
+ *
+ * @param dataJobId Unique identifier of the job within the request
+ * @param dataJobMetadata data job request headers
+ * @param dataJobReadRequest read data job request
+ */
+ void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest);
+
+ /**
+ * process write data job operations.
+ *
+ * @param dataJobId Unique identifier of the job within the request
+ * @param dataJobMetadata data job request headers
+ * @param dataJobWriteRequest write data job request
+ */
+ void writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobWriteRequest dataJobWriteRequest);
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
index 462679e74f..bdc3dee772 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
@@ -37,7 +37,8 @@ public enum NcmpResponseStatus {
UNKNOWN_ERROR("108", "Unknown error"),
CM_HANDLE_ALREADY_EXIST("109", "cm-handle already exists"),
CM_HANDLE_INVALID_ID("110", "cm-handle has an invalid character(s) in id"),
- ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated");
+ ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated"),
+ MESSAGE_TOO_LARGE("112", "message too large");
private final String code;
private final String message;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
new file mode 100644
index 0000000000..b4377b84f2
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.DataJobService;
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+@Slf4j
+public class DataJobServiceImpl implements DataJobService {
+
+ @Override
+ public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ final DataJobReadRequest dataJobReadRequest) {
+ log.info("data job id for read operation is: {}", dataJobId);
+ }
+
+ @Override
+ public void writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ final DataJobWriteRequest dataJobWriteRequest) {
+ log.info("data job id for write operation is: {}", dataJobId);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
index 4f2674aca3..167df5a98d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@@ -51,6 +52,8 @@ public class KafkaConfig<T> {
private final KafkaProperties kafkaProperties;
+ private static final SslBundles NO_SSL = null;
+
/**
* This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
* application.yml and replaces value-serializer by JsonSerializer.
@@ -59,7 +62,7 @@ public class KafkaConfig<T> {
*/
@Bean
public ProducerFactory<String, T> legacyEventProducerFactory() {
- final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
@@ -72,7 +75,7 @@ public class KafkaConfig<T> {
*/
@Bean
public ConsumerFactory<String, T> legacyEventConsumerFactory() {
- final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
@@ -112,7 +115,7 @@ public class KafkaConfig<T> {
*/
@Bean
public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
- final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
@@ -124,7 +127,7 @@ public class KafkaConfig<T> {
*/
@Bean
public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
- final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
@@ -136,7 +139,8 @@ public class KafkaConfig<T> {
*/
@Bean
public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
- final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate =
+ new KafkaTemplate<>(cloudEventProducerFactory());
kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
return kafkaTemplate;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
index ea72fd217b..82ae5467e2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
@@ -23,16 +23,21 @@ package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
+@RequiredArgsConstructor
public class CmNotificationSubscriptionDmiOutEventConsumer {
+ private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
+
/**
* Consume the Cm Notification Subscription event from the dmi-plugin.
*
@@ -56,7 +61,23 @@ public class CmNotificationSubscriptionDmiOutEventConsumer {
final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent) {
final String subscriptionId = correlationId.split("#")[0];
final String dmiPluginName = correlationId.split("#")[1];
+
+ if ("ACCEPTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
+ dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+
+ if ("REJECTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
+ }
+
log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
}
+
+ private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+ final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
+ dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
+ dmiPluginName, cmNotificationSubscriptionStatus);
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 2f10b1ce1e..8c1cac39dc 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -30,8 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
@@ -42,6 +44,7 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class DmiCmNotificationSubscriptionCacheHandler {
+ private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
@@ -83,6 +86,46 @@ public class DmiCmNotificationSubscriptionCacheHandler {
return dmiCmNotificationSubscriptionDetailsPerDmi;
}
+ /**
+ * Update status in map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ * @param status String of status
+ *
+ */
+ public void updateDmiCmNotificationSubscriptionStatusPerDmi(
+ final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .setCmNotificationSubscriptionStatus(status);
+ }
+
+ /**
+ * Persist map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ *
+ */
+ public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+ final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmNotificationSubscriptionPredicates();
+ for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
+ dmiCmNotificationSubscriptionPredicateList) {
+ final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+
+ for (final String cmHandle: cmHandles) {
+ for (final String xpath: xpaths) {
+ cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
+ cmHandle, xpath, subscriptionId);
+ }
+ }
+ }
+ }
+
private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
final String dmiServiceName,
final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
new file mode 100644
index 0000000000..dc8037b86f
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Metadata of read/write data job request.
+ *
+ * @param destination The destination of the data job results.
+ * @param dataAcceptType Define the data response accept type.
+ * e.g. "application/vnd.3gpp.object-tree-hierarchical+json",
+ * "application/vnd.3gpp.object-tree-flat+json" etc.
+ * @param dataContentType Define the data request content type.
+ * e.g. "application/3gpp-json-patch+json" etc.
+ */
+public record DataJobMetadata(String destination, String dataAcceptType, String dataContentType) {} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
new file mode 100644
index 0000000000..f861c3d498
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the read data job operation to be forwarded to dmi.
+ *
+ * @param data List of read operations to be executed.
+ */
+public record DataJobReadRequest(List<ReadOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
new file mode 100644
index 0000000000..254e198b81
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the write data job operation to be forwarded to dmi.
+ *
+ * @param data List of write operations to be executed.
+ */
+public record DataJobWriteRequest(List<WriteOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
new file mode 100644
index 0000000000..d2b0738969
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Holds information of read data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ * is executed. Url Encoded Fully Distinguished Name (FDN).
+ * @param op Describes the operation to execute. The value can only be "read".
+ * @param operationId Unique identifier of the operation within the request.
+ * @param attributes Specifies the attributes of the resources that are returned.
+ * @param fields Specifies the attribute fields of the resources that are returned. This should be used if an
+ * attribute is a struct and only a subset of its fields should be returned.
+ * @param filter This filters the managed Objects.
+ * @param scopeType This selects MOs depending on relationships with Base Managed Object.
+ * e.g. "BASE_ONLY", "BASE_ALL", "BASE_NTH_LEVEL" etc.
+ * @param scopeLevel Defines the level for objects to be returned for certain scopeTypes. The base level is zero.
+ */
+public record ReadOperation(String path, String op, String operationId, List<String> attributes, List<String> fields,
+ String filter, String scopeType, int scopeLevel) {
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
new file mode 100644
index 0000000000..c2f6504ce2
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Holds information of write data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ * is executed. Typically, is Fully Distinguished Name (FDN).
+ * @param op Describes the operation to execute. The value can be as below:
+ * e.g. "add", "replace", "remove", "action" etc.
+ * @param operationId Unique identifier of the operation within the request.
+ * @param value The value to be written depends on the type of operation.
+ */
+public record WriteOperation(String path, String op, String operationId, Object value) {}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
new file mode 100644
index 0000000000..43787640a3
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import org.slf4j.LoggerFactory
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata
+import org.onap.cps.ncmp.api.models.datajob.ReadOperation
+import org.onap.cps.ncmp.api.models.datajob.WriteOperation
+import spock.lang.Specification
+
+class DataJobServiceImplSpec extends Specification{
+
+ def objectUnderTest = new DataJobServiceImpl()
+
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
+ def setup() {
+ setupLogger()
+ }
+
+ def cleanup() {
+ ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class)).detachAndStopAllAppenders()
+ }
+
+ def '#operation data job request.'() {
+ given: 'data job metadata'
+ def dataJobMetadata = new DataJobMetadata('client-topic', 'application/vnd.3gpp.object-tree-hierarchical+json', 'application/3gpp-json-patch+json')
+ when: 'read/write data job request is processed'
+ if (operation == 'read') {
+ objectUnderTest.readDataJob('some-job-id', dataJobMetadata, new DataJobReadRequest([getWriteOrReadOperationRequest(operation)]))
+ } else {
+ objectUnderTest.writeDataJob('some-job-id', dataJobMetadata, new DataJobWriteRequest([getWriteOrReadOperationRequest(operation)]))
+ }
+ then: 'the data job id is correctly logged'
+ def loggingEvent = logger.list[0]
+ assert loggingEvent.level == Level.INFO
+ assert loggingEvent.formattedMessage.contains('data job id for ' + operation + ' operation is: some-job-id')
+ where: 'the following data job operations are used'
+ operation << ['read', 'write']
+ }
+
+ def getWriteOrReadOperationRequest(operation) {
+ if (operation == 'write') {
+ return new WriteOperation('some/write/path', 'add', 'some-operation-id', 'some-value')
+ }
+ return new ReadOperation('some/read/path', 'read', 'some-operation-id', ['some-attrib-1'], ['some-field-1'], 'some-filter', 'some-scope-type', 1)
+ }
+
+ def setupLogger() {
+ def setupLogger = ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class))
+ setupLogger.setLevel(Level.DEBUG)
+ setupLogger.addAppender(logger)
+ logger.start()
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
index d47be6cd5c..4d0af6f490 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
@@ -23,9 +23,6 @@
package org.onap.cps.ncmp.api.impl
-import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
-import org.onap.cps.ncmp.api.models.CmResourceAddress
-
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
@@ -35,6 +32,8 @@ import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RU
import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
+import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
+import org.onap.cps.ncmp.api.models.CmResourceAddress
import org.onap.cps.ncmp.api.impl.utils.AlternateIdChecker
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.NetworkCmProxyCmHandleQueryService
@@ -57,7 +56,6 @@ import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
import org.onap.cps.ncmp.api.models.DataOperationRequest
import org.onap.cps.spi.exceptions.CpsException
import org.onap.cps.spi.model.ConditionProperties
-
import java.util.stream.Collectors
import org.onap.cps.utils.JsonObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
index d5b0915526..16f27d081b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
index 4f0132e2bd..523ec767c6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
@@ -28,26 +28,32 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
- def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer()
- def logger = Spy(ListAppender<ILoggingEvent>)
-
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
ObjectMapper objectMapper
+ @SpringBean
+ DmiCmNotificationSubscriptionCacheHandler mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
+
+ def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer(mockDmiCmNotificationSubscriptionCacheHandler)
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
void setup() {
((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).addAppender(logger)
logger.start()
@@ -78,6 +84,29 @@ class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpe
assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted'
}
+ def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
+ given: 'a cmNotificationSubscription event'
+ def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
+ def cmNotificationSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent().withData(dmiOutEventData)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(cmNotificationSubscriptionDmiOutEvent))
+ .withId('random-uuid')
+ .withType('subscriptionCreateResponse')
+ .withSource(URI.create('test-dmi-plugin-name'))
+ .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+ then: 'correct number of calls to cache'
+ expectedCacheCalls * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ and: 'correct number of calls to persist cache'
+ expectedPersistenceCalls * mockDmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+ where: 'the following parameters are used'
+ scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
+ 'Accepted Status' | CmNotificationSubscriptionStatus.ACCEPTED | '1' || 1 | 1
+ 'Rejected Status' | CmNotificationSubscriptionStatus.REJECTED | '2' || 1 | 0
+ }
+
def getLoggingEvent() {
return logger.list[0]
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 132c4bc4a7..47a1c89468 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -45,9 +47,11 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
ObjectMapper objectMapper
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+ @SpringBean
+ CmNotificationSubscriptionPersistenceService mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
def testCache = [:]
- def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(testCache, mockInventoryPersistence)
+ def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(mockCmNotificationSubscriptionPersistenceService, testCache, mockInventoryPersistence)
CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent
def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
@@ -62,9 +66,9 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
def 'Load CM subscription event to cache'() {
given: 'a valid subscription event with Id'
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
when: 'a valid event object loaded in cache'
objectUnderTest.add(subscriptionId, predicates)
then: 'the cache contains the correct entry with #subscriptionId subscription ID'
@@ -115,6 +119,29 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet()
}
+ def 'Update subscription status in cache per DMI service name'() {
+ given: 'populated cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription status per dmi is updated in cache'
+ objectUnderTest.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmNotificationSubscriptionStatus.ACCEPTED)
+ then: 'verify status has been updated in cache'
+ def predicate = testCache.get(subscriptionId)
+ assert predicate.get('dmi-1').cmNotificationSubscriptionStatus == CmNotificationSubscriptionStatus.ACCEPTED
+ }
+
+ def 'Persist Cache into database per dmi'() {
+ given: 'populate cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription is persisted in database'
+ objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
+ then: 'persistence service is called the correct number of times per dmi'
+ 4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+ }
+
def setUpTestEvent(){
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)