summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java38
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java13
-rwxr-xr-xcps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java12
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java)55
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java64
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java46
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java30
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java20
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java2
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperation.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiBatchOperation.java)24
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java79
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DmiServiceUrlBuilder.java32
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/EventDateTimeFormatter.java23
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/ResourceDataBatchRequestUtils.java126
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/context/CpsApplicationContext.java51
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java99
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java178
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationDefinition.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/BatchOperationDefinition.java)4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationRequest.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/ResourceDataBatchRequest.java)4
22 files changed, 659 insertions, 259 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
new file mode 100644
index 0000000000..9f7ef1e882
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api;
+
+import lombok.Getter;
+
+@Getter
+public enum NcmpEventResponseCode {
+
+ CODE_100("100", "cm handle id(s) not found"),
+ CODE_101("101", "cm handle(s) not ready");
+
+ private final String statusCode;
+ private final String statusMessage;
+
+ NcmpEventResponseCode(final String statusCode, final String statusMessage) {
+ this.statusCode = statusCode;
+ this.statusMessage = statusMessage;
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java
index 046c78879b..a65e3c4be1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java
@@ -29,10 +29,10 @@ import org.onap.cps.ncmp.api.impl.operations.OperationType;
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters;
import org.onap.cps.ncmp.api.models.CmHandleQueryServiceParameters;
+import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.api.models.DmiPluginRegistration;
import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.api.models.ResourceDataBatchRequest;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.model.ModuleDefinition;
import org.onap.cps.spi.model.ModuleReference;
@@ -83,15 +83,14 @@ public interface NetworkCmProxyDataService {
FetchDescendantsOption fetchDescendantsOption);
/**
- * Get resource data for batch of cm handles using dmi.
+ * Execute (async) data operation for group of cm handles using dmi.
*
* @param topicParamInQuery topic name for (triggering) async responses
- * @param resourceDataBatchRequest cm handle identifiers
+ * @param dataOperationRequest contains a list of operation definitions(multiple operations)
*/
- void requestResourceDataForCmHandleBatch(String topicParamInQuery,
- ResourceDataBatchRequest
- resourceDataBatchRequest,
- String requestId);
+ void executeDataOperationForCmHandles(String topicParamInQuery,
+ DataOperationRequest dataOperationRequest,
+ String requestId);
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
index 536775ec5c..2e9d7c2021 100755
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
@@ -60,10 +60,10 @@ import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters;
import org.onap.cps.ncmp.api.models.CmHandleQueryServiceParameters;
import org.onap.cps.ncmp.api.models.CmHandleRegistrationResponse;
import org.onap.cps.ncmp.api.models.CmHandleRegistrationResponse.RegistrationError;
+import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.api.models.DmiPluginRegistration;
import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.api.models.ResourceDataBatchRequest;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.exceptions.AlreadyDefinedExceptionBatch;
import org.onap.cps.spi.exceptions.CpsException;
@@ -139,11 +139,11 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
}
@Override
- public void requestResourceDataForCmHandleBatch(final String topicParamInQuery,
- final ResourceDataBatchRequest
- resourceDataBatchRequest,
- final String requestId) {
- dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, resourceDataBatchRequest, requestId);
+ public void executeDataOperationForCmHandles(final String topicParamInQuery,
+ final DataOperationRequest
+ dataOperationRequest,
+ final String requestId) {
+ dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId);
}
@Override
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
index b76f86ebeb..514967574f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -45,7 +46,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
@EnableKafka
@RequiredArgsConstructor
-public class KafkaTemplateConfig<T> {
+public class KafkaConfig<T> {
private final KafkaProperties kafkaProperties;
@@ -76,6 +77,32 @@ public class KafkaTemplateConfig<T> {
}
/**
+ * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+ *
+ * @return an instance of legacy Kafka template.
+ */
+ @Bean
+ @Primary
+ public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+ /**
+ * A legacy concurrent kafka listener container factory.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(legacyEventConsumerFactory());
+ return containerFactory;
+ }
+
+ /**
* This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
* application.yml with CloudEventSerializer.
*
@@ -99,18 +126,6 @@ public class KafkaTemplateConfig<T> {
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
- /**
- * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
- *
- * @return an instance of legacy Kafka template.
- */
- @Bean
- @Primary
- public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
- final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
- kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
- return kafkaTemplate;
- }
/**
* A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
@@ -124,4 +139,18 @@ public class KafkaTemplateConfig<T> {
return kafkaTemplate;
}
+ /**
+ * A Concurrent CloudEvent kafka listener container factory.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+ cloudEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(cloudEventConsumerFactory());
+ return containerFactory;
+ }
+
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java
new file mode 100644
index 0000000000..544db50a55
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
+import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext;
+import org.onap.cps.utils.JsonObjectMapper;
+
+@Builder(buildMethodName = "setCloudEvent")
+public class NcmpCloudEventBuilder {
+
+ private Object event;
+ private Map<String, String> extensions;
+ private String type;
+ @Builder.Default
+ private static final String EVENT_SPEC_VERSION_V1 = "1.0.0";
+
+ /**
+ * Creates ncmp cloud event with provided attributes.
+ *
+ * @return Cloud Event
+ */
+ public CloudEvent build() {
+ final JsonObjectMapper jsonObjectMapper = CpsApplicationContext.getCpsBean(JsonObjectMapper.class);
+ final CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
+ .withId(UUID.randomUUID().toString())
+ .withSource(URI.create("NCMP"))
+ .withType(type)
+ .withDataSchema(URI.create("urn:cps:" + type + ":" + EVENT_SPEC_VERSION_V1))
+ .withTime(EventDateTimeFormatter.toIsoOffsetDateTime(
+ EventDateTimeFormatter.getCurrentIsoFormattedDateTime()))
+ .withData(jsonObjectMapper.asJsonBytes(event));
+ extensions.entrySet().stream()
+ .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue()))
+ .forEach(extensionEntry ->
+ cloudEventBuilder.withExtension(extensionEntry.getKey(), extensionEntry.getValue()));
+ return cloudEventBuilder.build();
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index b5ca176d1d..88ebd35c88 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -52,7 +52,8 @@ public class AvcEventConsumer {
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
final String newEventId = UUID.randomUUID().toString();
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
index a81f8fd731..c178700eed 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java
@@ -49,9 +49,7 @@ public class ResponseTimeoutTask implements Runnable {
private void generateAndSendResponse() {
final String subscriptionEventId = subscriptionClientId + subscriptionName;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
- final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
- subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName,
- dmiNames.isEmpty());
+ subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName);
forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
index 9e363f3cdd..1d87a057a7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java
@@ -21,6 +21,7 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -37,8 +38,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.event.model.SubscriptionEvent;
import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
@@ -55,6 +59,8 @@ public class SubscriptionEventForwarder {
private final EventsPublisher<SubscriptionEvent> eventsPublisher;
private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome;
+ private final SubscriptionEventMapper subscriptionEventMapper;
+ private final SubscriptionPersistence subscriptionPersistence;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@Value("${app.ncmp.avc.subscription-forward-topic-prefix}")
private String dmiAvcSubscriptionTopicPrefix;
@@ -83,11 +89,29 @@ public class SubscriptionEventForwarder {
final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName
= DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
+ findDmisAndRespond(subscriptionEvent, eventHeaders, cmHandleTargetsAsStrings,
+ dmiPropertiesPerCmHandleIdPerServiceName);
+ }
+
+ private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final Headers eventHeaders,
+ final List<String> cmHandleTargetsAsStrings,
+ final Map<String, Map<String, Map<String, String>>>
+ dmiPropertiesPerCmHandleIdPerServiceName) {
+ final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream()
+ .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList());
+
+ final List<String> targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings);
+ targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb);
+
final Set<String> dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet());
+
+ if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) {
+ updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb);
+ }
if (dmisToRespond.isEmpty()) {
final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID();
final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName();
- subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName, true);
+ subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName);
} else {
startResponseTimeout(subscriptionEvent, dmisToRespond);
forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders);
@@ -130,4 +154,24 @@ public class SubscriptionEventForwarder {
+ "-"
+ dmiName;
}
+
+ private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent(
+ final SubscriptionEvent subscriptionEvent,
+ final List<String> targetCmHandlesDoesNotExistInDb) {
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent =
+ subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent);
+ yangModelSubscriptionEvent.getPredicates()
+ .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb,
+ yangModelSubscriptionEvent));
+ subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
+ }
+
+ private static List<YangModelSubscriptionEvent.TargetCmHandle> findRejectedCmHandles(
+ final List<String> targetCmHandlesDoesNotExistInDb,
+ final YangModelSubscriptionEvent yangModelSubscriptionEvent) {
+ return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream()
+ .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId()))
+ .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(),
+ SubscriptionStatus.REJECTED)).collect(Collectors.toList());
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
index a1860a6136..20df706c07 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java
@@ -21,6 +21,8 @@
package org.onap.cps.ncmp.api.impl.events.avcsubscription;
import com.hazelcast.map.IMap;
+import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
@@ -28,8 +30,11 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus;
+import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.api.models.SubscriptionEventResponse;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@@ -64,28 +69,35 @@ public class SubscriptionEventResponseConsumer {
log.info("subscription event response of clientId: {} is received.", clientId);
final String subscriptionName = subscriptionEventResponse.getSubscriptionName();
final String subscriptionEventId = clientId + subscriptionName;
- boolean isFullOutcomeResponse = false;
+ boolean createOutcomeResponse = false;
if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
dmiNames.remove(subscriptionEventResponse.getDmiName());
forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
- isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
-
- if (isFullOutcomeResponse) {
- forwardedSubscriptionEventCache.remove(subscriptionEventId);
- }
+ createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
}
if (subscriptionModelLoaderEnabled) {
updateSubscriptionEvent(subscriptionEventResponse);
}
- if (isFullOutcomeResponse && notificationFeatureEnabled) {
- subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName,
- isFullOutcomeResponse);
+ if (createOutcomeResponse
+ && notificationFeatureEnabled
+ && hasNoPendingCmHandles(clientId, subscriptionName)) {
+ subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName);
+ forwardedSubscriptionEventCache.remove(subscriptionEventId);
}
}
+ private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) {
+ final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent(
+ clientId, subscriptionName);
+ final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
+ DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes(
+ dataNodeSubscription);
+ return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ }
+
private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) {
final YangModelSubscriptionEvent yangModelSubscriptionEvent =
subscriptionEventResponseMapper
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
index 1bfc4ab28b..8fdff17944 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java
@@ -57,28 +57,32 @@ public class SubscriptionEventResponseOutcome {
*
* @param subscriptionClientId client id of the subscription.
* @param subscriptionName name of the subscription.
- * @param isFullOutcomeResponse the flag to decide on complete or partial response to be generated.
*/
- public void sendResponse(final String subscriptionClientId, final String subscriptionName,
- final boolean isFullOutcomeResponse) {
+ public void sendResponse(final String subscriptionClientId, final String subscriptionName) {
final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse(
- subscriptionClientId, subscriptionName, isFullOutcomeResponse);
+ subscriptionClientId, subscriptionName);
final Headers headers = new RecordHeaders();
final String subscriptionEventId = subscriptionClientId + subscriptionName;
outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic,
subscriptionEventId, headers, subscriptionEventOutcome);
}
- private SubscriptionEventOutcome generateResponse(final String subscriptionClientId, final String subscriptionName,
- final boolean isFullOutcomeResponse) {
- final Collection<DataNode> dataNodes = subscriptionPersistence.getDataNodesForSubscriptionEvent();
+ private SubscriptionEventOutcome generateResponse(final String subscriptionClientId,
+ final String subscriptionName) {
+ final Collection<DataNode> dataNodes =
+ subscriptionPersistence.getCmHandlesForSubscriptionEvent(subscriptionClientId, subscriptionName);
final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes);
final List<Collection<Serializable>> cmHandleIdToStatus =
DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves);
+ final Map<String, SubscriptionStatus> cmHandleIdToStatusMap =
+ DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus);
return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName,
- isFullOutcomeResponse);
+ isFullOutcomeResponse(cmHandleIdToStatusMap));
}
+ private boolean isFullOutcomeResponse(final Map<String, SubscriptionStatus> cmHandleIdToStatusMap) {
+ return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING);
+ }
private SubscriptionEventOutcome formSubscriptionOutcomeMessage(
final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
index 3c7c92b129..450bc8cce3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
@@ -108,7 +108,7 @@ public class LcmEventsCreator {
final LcmEvent lcmEvent = new LcmEvent();
lcmEvent.setEventId(UUID.randomUUID().toString());
lcmEvent.setEventCorrelationId(eventCorrelationId);
- lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentDateTime());
+ lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime());
lcmEvent.setEventSource("org.onap.ncmp");
lcmEvent.setEventType(lcmEventType.getEventType());
lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event");
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiBatchOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperation.java
index 76ad0f7b2e..6346379b22 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiBatchOperation.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperation.java
@@ -27,13 +27,13 @@ import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
-import org.onap.cps.ncmp.api.models.BatchOperationDefinition;
+import org.onap.cps.ncmp.api.models.DataOperationDefinition;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Getter
@Builder
@JsonPropertyOrder({"operation", "operationId", "datastore", "options", "resourceIdentifier", "cmHandles"})
-public class DmiBatchOperation {
+public class DmiDataOperation {
@JsonProperty("operation")
private OperationType operationType;
@@ -45,20 +45,20 @@ public class DmiBatchOperation {
private final List<CmHandle> cmHandles = new ArrayList<>();
/**
- * Create and initialise a (outgoing) DMI batch operation.
+ * Create and initialise a (outgoing) DMI data operation.
*
- * @param batchOperationDefinition batchOperationDefinition definition of incoming of batch request
+ * @param dataOperationDefinition definition of incoming of dataOperation request
* @return mapped dmi operation details
*/
- public static DmiBatchOperation buildDmiBatchRequestBodyWithoutCmHandles(
- final BatchOperationDefinition batchOperationDefinition) {
+ public static DmiDataOperation buildDmiDataOperationRequestBodyWithoutCmHandles(
+ final DataOperationDefinition dataOperationDefinition) {
- return DmiBatchOperation.builder()
- .operationType(OperationType.fromOperationName(batchOperationDefinition.getOperation()))
- .operationId(batchOperationDefinition.getOperationId())
- .datastore(DatastoreType.fromDatastoreName(batchOperationDefinition.getDatastore()).getDatastoreName())
- .options(batchOperationDefinition.getOptions())
- .resourceIdentifier(batchOperationDefinition.getResourceIdentifier())
+ return DmiDataOperation.builder()
+ .operationType(OperationType.fromOperationName(dataOperationDefinition.getOperation()))
+ .operationId(dataOperationDefinition.getOperationId())
+ .datastore(DatastoreType.fromDatastoreName(dataOperationDefinition.getDatastore()).getDatastoreName())
+ .options(dataOperationDefinition.getOptions())
+ .resourceIdentifier(dataOperationDefinition.getResourceIdentifier())
.build();
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
index 3e8d40a83b..b4784f418f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
@@ -34,11 +34,11 @@ import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
import org.onap.cps.ncmp.api.impl.config.NcmpConfiguration;
import org.onap.cps.ncmp.api.impl.executor.TaskExecutor;
import org.onap.cps.ncmp.api.impl.utils.DmiServiceUrlBuilder;
-import org.onap.cps.ncmp.api.impl.utils.ResourceDataBatchRequestUtils;
+import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.inventory.CmHandleState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.api.models.ResourceDataBatchRequest;
+import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.spi.exceptions.CpsException;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.http.ResponseEntity;
@@ -118,24 +118,24 @@ public class DmiDataOperations extends DmiOperations {
* The data wil be returned as message on the topic specified.
*
* @param topicParamInQuery topic name for (triggering) async responses
- * @param resourceDataBatchRequest batch request for resource data
+ * @param dataOperationRequest data operation request to execute operations
* @param requestId requestId for as a response
*/
public void requestResourceDataFromDmi(final String topicParamInQuery,
- final ResourceDataBatchRequest resourceDataBatchRequest,
+ final DataOperationRequest dataOperationRequest,
final String requestId) {
final Set<String> cmHandlesIds
- = getDistinctCmHandleIdsFromBatchRequest(resourceDataBatchRequest);
+ = getDistinctCmHandleIdsFromDataOperationRequest(dataOperationRequest);
final Collection<YangModelCmHandle> yangModelCmHandles
- = getYangModelCmHandlesInReadyState(cmHandlesIds);
+ = inventoryPersistence.getYangModelCmHandles(cmHandlesIds);
- final Map<String, List<DmiBatchOperation>> operationsOutPerDmiServiceName
- = ResourceDataBatchRequestUtils.processPerOperationInBatchRequest(resourceDataBatchRequest,
- yangModelCmHandles);
+ final Map<String, List<DmiDataOperation>> operationsOutPerDmiServiceName
+ = ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(topicParamInQuery,
+ requestId, dataOperationRequest, yangModelCmHandles);
- buildBatchRequestUrlAndSendToDmiService(topicParamInQuery, requestId, operationsOutPerDmiServiceName);
+ buildDataOperationRequestUrlAndSendToDmiService(topicParamInQuery, requestId, operationsOutPerDmiServiceName);
}
/**
@@ -196,13 +196,13 @@ public class DmiDataOperations extends DmiOperations {
cmHandleId));
}
- private String getDmiServiceBatchRequestUrl(final String dmiServiceName,
- final String topicParamInQuery,
- final String requestId) {
- final MultiValueMap<String, String> batchRequestQueryParams = dmiServiceUrlBuilder
- .getBatchRequestQueryParams(topicParamInQuery, requestId);
- return dmiServiceUrlBuilder.getBatchRequestUrl(batchRequestQueryParams,
- dmiServiceUrlBuilder.populateBatchUriVariables(dmiServiceName));
+ private String getDmiServiceDataOperationRequestUrl(final String dmiServiceName,
+ final String topicParamInQuery,
+ final String requestId) {
+ final MultiValueMap<String, String> dataOperationRequestQueryParams = dmiServiceUrlBuilder
+ .getDataOperationRequestQueryParams(topicParamInQuery, requestId);
+ return dmiServiceUrlBuilder.getDataOperationRequestUrl(dataOperationRequestQueryParams,
+ dmiServiceUrlBuilder.populateDataOperationRequestUriVariables(dmiServiceName));
}
private void validateIfCmHandleStateReady(final YangModelCmHandle yangModelCmHandle,
@@ -214,41 +214,34 @@ public class DmiDataOperations extends DmiOperations {
}
}
- private static Set<String> getDistinctCmHandleIdsFromBatchRequest(final ResourceDataBatchRequest
- resourceDataBatchRequest) {
- return resourceDataBatchRequest.getBatchOperationDefinitions().stream()
- .flatMap(batchOperationDefinition ->
- batchOperationDefinition.getCmHandleIds().stream()).collect(Collectors.toSet());
+ private static Set<String> getDistinctCmHandleIdsFromDataOperationRequest(final DataOperationRequest
+ dataOperationRequest) {
+ return dataOperationRequest.getDataOperationDefinitions().stream()
+ .flatMap(dataOperationDefinition ->
+ dataOperationDefinition.getCmHandleIds().stream()).collect(Collectors.toSet());
}
- private Collection<YangModelCmHandle> getYangModelCmHandlesInReadyState(final Set<String> requestedCmHandleIds) {
- // TODO Need to publish an error response to client given topic.
- // Code should be implemented into https://jira.onap.org/browse/CPS-1614 (
- // NCMP : Error handling for non-ready cm handle state)
- return inventoryPersistence.getYangModelCmHandles(requestedCmHandleIds).stream()
- .filter(yangModelCmHandle -> yangModelCmHandle.getCompositeState().getCmHandleState()
- == CmHandleState.READY).collect(Collectors.toList());
- }
-
- private void buildBatchRequestUrlAndSendToDmiService(final String topicParamInQuery,
- final String requestId,
- final Map<String, List<DmiBatchOperation>>
+ private void buildDataOperationRequestUrlAndSendToDmiService(final String topicParamInQuery,
+ final String requestId,
+ final Map<String, List<DmiDataOperation>>
groupsOutPerDmiServiceName) {
groupsOutPerDmiServiceName.entrySet().forEach(groupsOutPerDmiServiceNameEntry -> {
final String dmiServiceName = groupsOutPerDmiServiceNameEntry.getKey();
- final List<DmiBatchOperation> dmiBatchRequestBodies = groupsOutPerDmiServiceNameEntry.getValue();
- final String dmiBatchResourceDataUrl = getDmiServiceBatchRequestUrl(dmiServiceName, topicParamInQuery,
- requestId);
- sendBatchRequestToDmiService(dmiBatchResourceDataUrl, dmiBatchRequestBodies);
+ final List<DmiDataOperation> dmiDataOperationRequestBodies = groupsOutPerDmiServiceNameEntry.getValue();
+ final String dmiDataOperationResourceUrl =
+ getDmiServiceDataOperationRequestUrl(dmiServiceName, topicParamInQuery, requestId);
+ sendDataOperationRequestToDmiService(dmiDataOperationResourceUrl, dmiDataOperationRequestBodies);
});
}
- private void sendBatchRequestToDmiService(final String batchResourceDataUrl,
- final List<DmiBatchOperation> dmiBatchRequestBodies) {
- final String batchRequestBodiesAsJsonString = jsonObjectMapper.asJsonString(dmiBatchRequestBodies);
- TaskExecutor.executeTask(() -> dmiRestClient.postOperationWithJsonData(batchResourceDataUrl,
- batchRequestBodiesAsJsonString, READ), DEFAULT_ASYNC_TASK_EXECUTOR_TIMEOUT_IN_MILLISECONDS)
+ private void sendDataOperationRequestToDmiService(final String dataOperationResourceUrl,
+ final List<DmiDataOperation> dmiDataOperationRequestBodies) {
+ final String dataOperationRequestBodiesAsJsonString =
+ jsonObjectMapper.asJsonString(dmiDataOperationRequestBodies);
+ TaskExecutor.executeTask(() -> dmiRestClient.postOperationWithJsonData(dataOperationResourceUrl,
+ dataOperationRequestBodiesAsJsonString, READ),
+ DEFAULT_ASYNC_TASK_EXECUTOR_TIMEOUT_IN_MILLISECONDS)
.whenCompleteAsync(this::handleTaskCompletion);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
index 8d44592ae2..f42a378fcb 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java
@@ -93,4 +93,15 @@ public class DataNodeHelper {
}
return resultMap;
}
+
+ /**
+ * Extracts the mapping of cm handle id to status from data node collection.
+ *
+ * @param dataNodes as a collection
+ * @return cm handle id to status mapping
+ */
+ public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMapFromDataNodes(
+ final Collection<DataNode> dataNodes) {
+ return getCmHandleIdToStatusMap(getCmHandleIdToStatus(getDataNodeLeaves(dataNodes)));
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DmiServiceUrlBuilder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DmiServiceUrlBuilder.java
index 5c6fa9f0b0..d855442c53 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DmiServiceUrlBuilder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DmiServiceUrlBuilder.java
@@ -53,17 +53,17 @@ public class DmiServiceUrlBuilder {
}
/**
- * This method builds batch request url.
+ * This method builds data operation request url.
*
- * @param batchRequestQueryParams query param map as key, value pair
- * @param batchRequestUriVariables uri param map as key (placeholder), value pair
- * @return {@code String} batch request url as string
+ * @param dataoperationRequestQueryParams query param map as key, value pair
+ * @param dataoperationRequestUriVariables uri param map as key (placeholder), value pair
+ * @return {@code String} data operation request url as string
*/
- public String getBatchRequestUrl(final MultiValueMap<String, String> batchRequestQueryParams,
- final Map<String, Object> batchRequestUriVariables) {
- return getBatchResourceDataBasePathUriBuilder()
- .queryParams(batchRequestQueryParams)
- .uriVariables(batchRequestUriVariables)
+ public String getDataOperationRequestUrl(final MultiValueMap<String, String> dataoperationRequestQueryParams,
+ final Map<String, Object> dataoperationRequestUriVariables) {
+ return getDataOperationResourceDataBasePathUriBuilder()
+ .queryParams(dataoperationRequestQueryParams)
+ .uriVariables(dataoperationRequestUriVariables)
.buildAndExpand().toUriString();
}
@@ -82,11 +82,11 @@ public class DmiServiceUrlBuilder {
}
/**
- * This method creates the dmi service url builder object with path variables for batch of cm handles.
+ * This method creates the dmi service url builder object with path variables for data operation request.
*
* @return {@code UriComponentsBuilder} dmi service url builder object
*/
- public UriComponentsBuilder getBatchResourceDataBasePathUriBuilder() {
+ public UriComponentsBuilder getDataOperationResourceDataBasePathUriBuilder() {
return UriComponentsBuilder.newInstance()
.path("{dmiServiceName}")
.pathSegment("{dmiBasePath}")
@@ -116,12 +116,12 @@ public class DmiServiceUrlBuilder {
}
/**
- * This method populates uri variables for batch request.
+ * This method populates uri variables for data operation request.
*
* @param dmiServiceName dmi service name
* @return {@code Map<String, Object>} uri variables as map
*/
- public Map<String, Object> populateBatchUriVariables(final String dmiServiceName) {
+ public Map<String, Object> populateDataOperationRequestUriVariables(final String dmiServiceName) {
final Map<String, Object> uriVariables = new HashMap<>();
final String dmiBasePath = dmiProperties.getDmiBasePath();
uriVariables.put("dmiServiceName", dmiServiceName);
@@ -151,14 +151,14 @@ public class DmiServiceUrlBuilder {
}
/**
- * This method is used to populate map from query params for batch request.
+ * This method is used to populate map from query params for data operation request.
*
* @param topicParamInQuery topic into url param
* @param requestId unique id of response for valid topic
* @return all valid query params as map
*/
- public MultiValueMap<String, String> getBatchRequestQueryParams(final String topicParamInQuery,
- final String requestId) {
+ public MultiValueMap<String, String> getDataOperationRequestQueryParams(final String topicParamInQuery,
+ final String requestId) {
final MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
getQueryParamConsumer().accept("topic", topicParamInQuery, queryParams);
getQueryParamConsumer().accept("requestId", requestId, queryParams);
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/EventDateTimeFormatter.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/EventDateTimeFormatter.java
index acc4057d9d..5dd6827126 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/EventDateTimeFormatter.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/EventDateTimeFormatter.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
+ * Copyright (C) 2022-2023 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,23 +20,28 @@
package org.onap.cps.ncmp.api.impl.utils;
+import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class EventDateTimeFormatter {
+public interface EventDateTimeFormatter {
- private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+ String ISO_TIMESTAMP_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+
+ DateTimeFormatter ISO_TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern(ISO_TIMESTAMP_PATTERN);
/**
* Gets current date time.
*
* @return the current date time
*/
- public static String getCurrentDateTime() {
- return ZonedDateTime.now()
- .format(DateTimeFormatter.ofPattern(DATE_TIME_FORMAT));
+ static String getCurrentIsoFormattedDateTime() {
+ return ZonedDateTime.now().format(ISO_TIMESTAMP_FORMATTER);
+ }
+
+ static OffsetDateTime toIsoOffsetDateTime(final String dateTimestampAsString) {
+ return StringUtils.isNotBlank(dateTimestampAsString)
+ ? OffsetDateTime.parse(dateTimestampAsString, ISO_TIMESTAMP_FORMATTER) : null;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/ResourceDataBatchRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/ResourceDataBatchRequestUtils.java
deleted file mode 100644
index e4c9bfb39b..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/ResourceDataBatchRequestUtils.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.utils;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.impl.operations.CmHandle;
-import org.onap.cps.ncmp.api.impl.operations.DmiBatchOperation;
-import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
-import org.onap.cps.ncmp.api.models.BatchOperationDefinition;
-import org.onap.cps.ncmp.api.models.ResourceDataBatchRequest;
-
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class ResourceDataBatchRequestUtils {
-
- private static final String UNKNOWN_SERVICE_NAME = null;
-
- /**
- * Create a list of DMI batch operation per DMI service (name).
- *
- * @param resourceDataBatchRequestIn incoming batch request details for resource data
- * @param yangModelCmHandles involved cm handles represented as YangModelCmHandle (incl. metadata)
- *
- * @return {@code Map<String, List<DmiBatchOperation>>} Create a list of DMI batch operation per DMI service (name).
- */
- public static Map<String, List<DmiBatchOperation>> processPerOperationInBatchRequest(
- final ResourceDataBatchRequest resourceDataBatchRequestIn,
- final Collection<YangModelCmHandle> yangModelCmHandles) {
-
- final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName =
- DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
-
- final Map<String, String> dmiServiceNamesPerCmHandleId =
- getDmiServiceNamesPerCmHandleId(dmiPropertiesPerCmHandleIdPerServiceName);
-
- final Map<String, List<DmiBatchOperation>> dmiBatchOperationsOutPerDmiServiceName = new HashMap<>();
-
- for (final BatchOperationDefinition batchOperationDefinitionIn :
- resourceDataBatchRequestIn.getBatchOperationDefinitions()) {
- for (final String cmHandleId : batchOperationDefinitionIn.getCmHandleIds()) {
- final String dmiServiceName = dmiServiceNamesPerCmHandleId.get(cmHandleId);
- final Map<String, String> cmHandleIdProperties
- = dmiPropertiesPerCmHandleIdPerServiceName.get(dmiServiceName).get(cmHandleId);
- if (cmHandleIdProperties == null) {
- publishErrorMessageToClientTopic(cmHandleId);
- } else {
- final DmiBatchOperation dmiBatchOperationOut = getOrAddDmiBatchOperation(dmiServiceName,
- batchOperationDefinitionIn, dmiBatchOperationsOutPerDmiServiceName);
- final CmHandle cmHandle = CmHandle.buildCmHandleWithProperties(cmHandleId, cmHandleIdProperties);
- dmiBatchOperationOut.getCmHandles().add(cmHandle);
- }
- }
- }
- return dmiBatchOperationsOutPerDmiServiceName;
- }
-
- private static void publishErrorMessageToClientTopic(final String requestedCmHandleId) {
- log.warn("cm handle {} not found", requestedCmHandleId);
- // TODO Need to publish an error response to client given topic.
- // Code should be implemented into https://jira.onap.org/browse/CPS-1583 (
- // NCMP : Handle non-existing cm handles)
- }
-
- private static Map<String, String> getDmiServiceNamesPerCmHandleId(
- final Map<String, Map<String, Map<String, String>>> dmiDmiPropertiesPerCmHandleIdPerServiceName) {
- final Map<String, String> dmiServiceNamesPerCmHandleId = new HashMap<>();
- for (final Map.Entry<String, Map<String, Map<String, String>>> dmiDmiPropertiesEntry
- : dmiDmiPropertiesPerCmHandleIdPerServiceName.entrySet()) {
- final String dmiServiceName = dmiDmiPropertiesEntry.getKey();
- final Set<String> cmHandleIds = dmiDmiPropertiesPerCmHandleIdPerServiceName.get(dmiServiceName).keySet();
- for (final String cmHandleId : cmHandleIds) {
- dmiServiceNamesPerCmHandleId.put(cmHandleId, dmiServiceName);
- }
- }
- dmiDmiPropertiesPerCmHandleIdPerServiceName.put(UNKNOWN_SERVICE_NAME, Collections.emptyMap());
- return dmiServiceNamesPerCmHandleId;
- }
-
- private static DmiBatchOperation getOrAddDmiBatchOperation(final String dmiServiceName,
- final BatchOperationDefinition
- batchOperationDefinitionIn,
- final Map<String, List<DmiBatchOperation>>
- dmiBatchOperationsOutPerDmiServiceName) {
- dmiBatchOperationsOutPerDmiServiceName
- .computeIfAbsent(dmiServiceName, dmiServiceNameAsKey -> new ArrayList<>());
- final List<DmiBatchOperation> dmiBatchOperationsOut
- = dmiBatchOperationsOutPerDmiServiceName.get(dmiServiceName);
- final boolean isNewOperation = dmiBatchOperationsOut.isEmpty()
- || !dmiBatchOperationsOut.get(dmiBatchOperationsOut.size() - 1).getOperationId()
- .equals(batchOperationDefinitionIn.getOperationId());
- if (isNewOperation) {
- final DmiBatchOperation newDmiBatchOperationOut =
- DmiBatchOperation.buildDmiBatchRequestBodyWithoutCmHandles(batchOperationDefinitionIn);
- dmiBatchOperationsOut.add(newDmiBatchOperationOut);
- return newDmiBatchOperationOut;
- }
- return dmiBatchOperationsOut.get(dmiBatchOperationsOut.size() - 1);
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/context/CpsApplicationContext.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/context/CpsApplicationContext.java
new file mode 100644
index 0000000000..b14cf0d0db
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/context/CpsApplicationContext.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils.context;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class CpsApplicationContext implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ /**
+ * Returns the spring managed cps bean instance of the given class type if it exists.
+ * Returns null otherwise.
+ *
+ * @param cpsBeanClass cps class type
+ * @return requested bean instance
+ */
+ public static <T extends Object> T getCpsBean(final Class<T> cpsBeanClass) {
+ return applicationContext.getBean(cpsBeanClass);
+ }
+
+ @Override
+ public void setApplicationContext(final ApplicationContext cpsApplicationContext) {
+ setCpsApplicationContext(cpsApplicationContext);
+ }
+
+ private static synchronized void setCpsApplicationContext(final ApplicationContext cpsApplicationContext) {
+ CpsApplicationContext.applicationContext = cpsApplicationContext;
+ }
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
new file mode 100644
index 0000000000..2d9a51b844
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils.data.operation;
+
+import io.cloudevents.CloudEvent;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.NcmpEventResponseCode;
+import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder;
+import org.onap.cps.ncmp.events.async1_0_0.Data;
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import org.onap.cps.ncmp.events.async1_0_0.Response;
+import org.springframework.util.MultiValueMap;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DataOperationEventCreator {
+
+ /**
+ * Creates data operation event.
+ *
+ * @param clientTopic topic the client wants to use for responses
+ * @param requestId unique identifier per request
+ * @param cmHandleIdsPerResponseCodesPerOperationId map of cm handles per operation response per response code
+ * @return Cloud Event
+ */
+ public static CloudEvent createDataOperationEvent(final String clientTopic,
+ final String requestId,
+ final MultiValueMap<String,
+ Map<NcmpEventResponseCode, List<String>>>
+ cmHandleIdsPerResponseCodesPerOperationId) {
+ final DataOperationEvent dataOperationEvent = new DataOperationEvent();
+ final Data data = createPayloadFromDataOperationResponses(cmHandleIdsPerResponseCodesPerOperationId);
+ dataOperationEvent.setData(data);
+ final Map<String, String> extensions = createDataOperationExtensions(requestId, clientTopic);
+ return NcmpCloudEventBuilder.builder().type(DataOperationEvent.class.getName())
+ .event(dataOperationEvent).extensions(extensions).setCloudEvent().build();
+ }
+
+ private static Data createPayloadFromDataOperationResponses(final MultiValueMap<String, Map<NcmpEventResponseCode,
+ List<String>>> cmHandleIdsPerOperationIdPerResponseCode) {
+ final Data data = new Data();
+ final List<org.onap.cps.ncmp.events.async1_0_0.Response> responses = new ArrayList<>();
+ cmHandleIdsPerOperationIdPerResponseCode.entrySet().forEach(cmHandleIdsPerOperationIdPerResponseCodeEntries ->
+ cmHandleIdsPerOperationIdPerResponseCodeEntries.getValue().forEach(cmHandleIdsPerResponseCodeEntries ->
+ responses.addAll(createResponseFromDataOperationResponses(
+ cmHandleIdsPerOperationIdPerResponseCodeEntries.getKey(),
+ cmHandleIdsPerResponseCodeEntries)
+ )));
+ data.setResponses(responses);
+ return data;
+ }
+
+ private static List<Response> createResponseFromDataOperationResponses(
+ final String operationId,
+ final Map<NcmpEventResponseCode, List<String>> cmHandleIdsPerResponseCodeEntries) {
+ final List<org.onap.cps.ncmp.events.async1_0_0.Response> responses = new ArrayList<>();
+ cmHandleIdsPerResponseCodeEntries.entrySet()
+ .forEach(cmHandleIdsPerResponseCodeEntry -> {
+ final Response response = new Response();
+ response.setOperationId(operationId);
+ response.setStatusCode(cmHandleIdsPerResponseCodeEntry.getKey().getStatusCode());
+ response.setStatusMessage(cmHandleIdsPerResponseCodeEntry.getKey().getStatusMessage());
+ response.setIds(cmHandleIdsPerResponseCodeEntry.getValue());
+ responses.add(response);
+ });
+ return responses;
+ }
+
+ private static Map<String, String> createDataOperationExtensions(final String requestId, final String clientTopic) {
+ final Map<String, String> extensions = new HashMap<>();
+ extensions.put("correlationid", requestId);
+ extensions.put("destination", clientTopic);
+ return extensions;
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
new file mode 100644
index 0000000000..957f48a862
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -0,0 +1,178 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.utils.data.operation;
+
+import io.cloudevents.CloudEvent;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.NcmpEventResponseCode;
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
+import org.onap.cps.ncmp.api.impl.operations.CmHandle;
+import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation;
+import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer;
+import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.CmHandleState;
+import org.onap.cps.ncmp.api.models.DataOperationDefinition;
+import org.onap.cps.ncmp.api.models.DataOperationRequest;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ResourceDataOperationRequestUtils {
+
+ private static final String UNKNOWN_SERVICE_NAME = null;
+
+ /**
+ * Create a list of DMI data operation per DMI service (name).
+ *
+ * @param topicParamInQuery client given topic
+ * @param requestId unique identifier per request
+ * @param dataOperationRequestIn incoming data operation request details
+ * @param yangModelCmHandles involved cm handles represented as YangModelCmHandle (incl. metadata)
+ * @return {@code Map<String, List<DmiBatchOperation>>} Create a list of DMI batch operation per DMI service (name).
+ */
+ public static Map<String, List<DmiDataOperation>> processPerDefinitionInDataOperationsRequest(
+ final String topicParamInQuery,
+ final String requestId,
+ final DataOperationRequest dataOperationRequestIn,
+ final Collection<YangModelCmHandle> yangModelCmHandles) {
+
+ final Map<String, List<DmiDataOperation>> dmiDataOperationsOutPerDmiServiceName = new HashMap<>();
+ final MultiValueMap<String, Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerOperationIdPerResponseCode
+ = new LinkedMultiValueMap<>();
+ final Set<String> nonReadyCmHandleIdsLookup = filterAndGetNonReadyCmHandleIds(yangModelCmHandles);
+
+ final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName =
+ DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles);
+
+ final Map<String, String> dmiServiceNamesPerCmHandleId =
+ getDmiServiceNamesPerCmHandleId(dmiPropertiesPerCmHandleIdPerServiceName);
+
+ for (final DataOperationDefinition dataOperationDefinitionIn :
+ dataOperationRequestIn.getDataOperationDefinitions()) {
+ final List<String> nonExistingCmHandleIds = new ArrayList<>();
+ final List<String> nonReadyCmHandleIds = new ArrayList<>();
+ for (final String cmHandleId : dataOperationDefinitionIn.getCmHandleIds()) {
+ if (nonReadyCmHandleIdsLookup.contains(cmHandleId)) {
+ nonReadyCmHandleIds.add(cmHandleId);
+ } else {
+ final String dmiServiceName = dmiServiceNamesPerCmHandleId.get(cmHandleId);
+ final Map<String, String> cmHandleIdProperties
+ = dmiPropertiesPerCmHandleIdPerServiceName.get(dmiServiceName).get(cmHandleId);
+ if (cmHandleIdProperties == null) {
+ nonExistingCmHandleIds.add(cmHandleId);
+ } else {
+ final DmiDataOperation dmiBatchOperationOut = getOrAddDmiBatchOperation(dmiServiceName,
+ dataOperationDefinitionIn, dmiDataOperationsOutPerDmiServiceName);
+ final CmHandle cmHandle = CmHandle.buildCmHandleWithProperties(cmHandleId,
+ cmHandleIdProperties);
+ dmiBatchOperationOut.getCmHandles().add(cmHandle);
+ }
+ }
+ }
+ populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerOperationIdPerResponseCode,
+ dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CODE_100, nonExistingCmHandleIds);
+ populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerOperationIdPerResponseCode,
+ dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CODE_101, nonReadyCmHandleIds);
+ }
+ if (!cmHandleIdsPerOperationIdPerResponseCode.isEmpty()) {
+ publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerOperationIdPerResponseCode);
+ }
+ return dmiDataOperationsOutPerDmiServiceName;
+ }
+
+ @Async
+ private static void publishErrorMessageToClientTopic(final String clientTopic,
+ final String requestId,
+ final MultiValueMap<String,
+ Map<NcmpEventResponseCode, List<String>>>
+ cmHandleIdsPerOperationIdPerResponseCode) {
+ final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+ requestId, cmHandleIdsPerOperationIdPerResponseCode);
+ final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ }
+
+ private static Map<String, String> getDmiServiceNamesPerCmHandleId(
+ final Map<String, Map<String, Map<String, String>>> dmiDmiPropertiesPerCmHandleIdPerServiceName) {
+ final Map<String, String> dmiServiceNamesPerCmHandleId = new HashMap<>();
+ for (final Map.Entry<String, Map<String, Map<String, String>>> dmiDmiPropertiesEntry
+ : dmiDmiPropertiesPerCmHandleIdPerServiceName.entrySet()) {
+ final String dmiServiceName = dmiDmiPropertiesEntry.getKey();
+ final Set<String> cmHandleIds = dmiDmiPropertiesPerCmHandleIdPerServiceName.get(dmiServiceName).keySet();
+ for (final String cmHandleId : cmHandleIds) {
+ dmiServiceNamesPerCmHandleId.put(cmHandleId, dmiServiceName);
+ }
+ }
+ dmiDmiPropertiesPerCmHandleIdPerServiceName.put(UNKNOWN_SERVICE_NAME, Collections.emptyMap());
+ return dmiServiceNamesPerCmHandleId;
+ }
+
+ private static DmiDataOperation getOrAddDmiBatchOperation(final String dmiServiceName,
+ final DataOperationDefinition
+ dataOperationDefinitionIn,
+ final Map<String, List<DmiDataOperation>>
+ dmiBatchOperationsOutPerDmiServiceName) {
+ dmiBatchOperationsOutPerDmiServiceName
+ .computeIfAbsent(dmiServiceName, dmiServiceNameAsKey -> new ArrayList<>());
+ final List<DmiDataOperation> dmiBatchOperationsOut
+ = dmiBatchOperationsOutPerDmiServiceName.get(dmiServiceName);
+ final boolean isNewOperation = dmiBatchOperationsOut.isEmpty()
+ || !dmiBatchOperationsOut.get(dmiBatchOperationsOut.size() - 1).getOperationId()
+ .equals(dataOperationDefinitionIn.getOperationId());
+ if (isNewOperation) {
+ final DmiDataOperation newDmiBatchOperationOut =
+ DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn);
+ dmiBatchOperationsOut.add(newDmiBatchOperationOut);
+ return newDmiBatchOperationOut;
+ }
+ return dmiBatchOperationsOut.get(dmiBatchOperationsOut.size() - 1);
+ }
+
+ private static Set<String> filterAndGetNonReadyCmHandleIds(final Collection<YangModelCmHandle> yangModelCmHandles) {
+ return yangModelCmHandles.stream()
+ .filter(yangModelCmHandle -> yangModelCmHandle.getCompositeState().getCmHandleState()
+ != CmHandleState.READY).map(YangModelCmHandle::getId).collect(Collectors.toSet());
+ }
+
+ private static void populateCmHandleIdsPerOperationIdPerResponseCode(final MultiValueMap<String,
+ Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerOperationIdByResponseCode,
+ final String operationId,
+ final NcmpEventResponseCode
+ ncmpEventResponseCode,
+ final List<String> cmHandleIds) {
+ if (!cmHandleIds.isEmpty()) {
+ cmHandleIdsPerOperationIdByResponseCode.add(operationId, Map.of(ncmpEventResponseCode, cmHandleIds));
+ }
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/BatchOperationDefinition.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationDefinition.java
index 04075b3b7c..8182fbfcc8 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/BatchOperationDefinition.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationDefinition.java
@@ -35,7 +35,7 @@ import lombok.Setter;
@EqualsAndHashCode
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class BatchOperationDefinition {
+public class DataOperationDefinition {
private String operation;
private String operationId;
@@ -45,5 +45,5 @@ public class BatchOperationDefinition {
@JsonProperty("targetIds")
@Valid
- private List<String> cmHandleIds = new ArrayList();
+ private List<String> cmHandleIds = new ArrayList<>();
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/ResourceDataBatchRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationRequest.java
index 7af107c37a..6fa7d5c755 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/ResourceDataBatchRequest.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/DataOperationRequest.java
@@ -35,9 +35,9 @@ import lombok.Setter;
@EqualsAndHashCode
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonIgnoreProperties(ignoreUnknown = true)
-public class ResourceDataBatchRequest {
+public class DataOperationRequest {
@JsonProperty("operations")
@Valid
- private List<BatchOperationDefinition> batchOperationDefinitions = Collections.emptyList();
+ private List<DataOperationDefinition> dataOperationDefinitions = Collections.emptyList();
}