From 2fe802a8bdda1e628c4558f9079439893e7bd52e Mon Sep 17 00:00:00 2001 From: mpriyank Date: Tue, 8 Aug 2023 15:27:18 +0100 Subject: Refactoring Subscription Create LCM use case - Client to NCMP: CmSubscriptionNcmpInEvent - NCMP to DMI: CmSubscriptionDmiInEvent - DMI to NCMP: CmSubscriptionDmiOutEvent - NCMP to Client: CmSubscriptionNcmpOutEvent - code package changed from avcsubscription to cmsubscription - Other classes name as per the events naming above - Test classes refactored - NO LOGIC changes incorporated in this patch Issue-ID: CPS-1831 Change-Id: Id5ad5f799007deaaf6d6fc0f402c130339263d09 Signed-off-by: mpriyank --- .../ClientSubscriptionEventMapper.java | 34 ---- .../avcsubscription/ResponseTimeoutTask.java | 52 ------ .../avcsubscription/SubscriptionEventConsumer.java | 95 ---------- .../SubscriptionEventForwarder.java | 192 --------------------- .../avcsubscription/SubscriptionEventMapper.java | 52 ------ .../SubscriptionEventResponseConsumer.java | 117 ------------- .../SubscriptionEventResponseMapper.java | 55 ------ .../SubscriptionEventResponseOutcome.java | 151 ---------------- .../avcsubscription/SubscriptionOutcomeMapper.java | 105 ----------- .../CmSubscriptionDmiOutEventConsumer.java | 118 +++++++++++++ ...OutEventToCmSubscriptionNcmpOutEventMapper.java | 105 +++++++++++ ...OutEventToYangModelSubscriptionEventMapper.java | 55 ++++++ .../CmSubscriptionNcmpInEventConsumer.java | 97 +++++++++++ .../CmSubscriptionNcmpInEventForwarder.java | 191 ++++++++++++++++++++ .../CmSubscriptionNcmpInEventMapper.java | 52 ++++++ ...cmpInEventToCmSubscriptionDmiInEventMapper.java | 35 ++++ .../CmSubscriptionNcmpOutEventPublisher.java | 153 ++++++++++++++++ .../events/cmsubscription/ResponseTimeoutTask.java | 52 ++++++ .../impl/utils/CmSubscriptionEventCloudMapper.java | 86 +++++++++ .../impl/utils/SubscriptionEventCloudMapper.java | 90 ---------- .../SubscriptionEventResponseCloudMapper.java | 18 +- .../impl/utils/SubscriptionOutcomeCloudMapper.java | 12 +- 22 files changed, 959 insertions(+), 958 deletions(-) delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventForwarder.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpOutEventPublisher.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/ResponseTimeoutTask.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/CmSubscriptionEventCloudMapper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java (limited to 'cps-ncmp-service/src/main/java/org/onap') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java deleted file mode 100644 index 59b1d09c7b..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; - -@Mapper(componentModel = "spring") -public interface ClientSubscriptionEventMapper { - - @Mapping(target = "data.predicates.targets", ignore = true) - org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent toNcmpSubscriptionEvent( - SubscriptionEvent subscriptionEvent); - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java deleted file mode 100644 index e3f529787a..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import com.hazelcast.map.IMap; -import java.util.Set; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; - -@Slf4j -@RequiredArgsConstructor -public class ResponseTimeoutTask implements Runnable { - - private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - private final SubscriptionEventResponse subscriptionEventResponse; - - @Override - public void run() { - generateTimeoutResponse(); - } - - private void generateTimeoutResponse() { - final String subscriptionClientId = subscriptionEventResponse.getData().getClientId(); - final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); - final String subscriptionEventId = subscriptionClientId + subscriptionName; - if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, - "subscriptionCreatedStatus"); - forwardedSubscriptionEventCache.remove(subscriptionEventId); - } - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java deleted file mode 100644 index 8dfdc3cd4e..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL; -import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING; - -import io.cloudevents.CloudEvent; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; -import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - - -@Component -@Slf4j -@RequiredArgsConstructor -public class SubscriptionEventConsumer { - - private final SubscriptionEventForwarder subscriptionEventForwarder; - private final SubscriptionEventMapper subscriptionEventMapper; - private final SubscriptionPersistence subscriptionPersistence; - private final SubscriptionEventCloudMapper subscriptionEventCloudMapper; - - @Value("${notification.enabled:true}") - private boolean notificationFeatureEnabled; - - @Value("${ncmp.model-loader.subscription:false}") - private boolean subscriptionModelLoaderEnabled; - - /** - * Consume the specified event. - * - * @param subscriptionEventConsumerRecord the event to be consumed - */ - @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeSubscriptionEvent(final ConsumerRecord subscriptionEventConsumerRecord) { - final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); - final String eventType = subscriptionEventConsumerRecord.value().getType(); - final SubscriptionEvent subscriptionEvent = subscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent); - final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore(); - if (!eventDatastore.equals(PASSTHROUGH_RUNNING.getDatastoreName()) - || eventDatastore.equals(PASSTHROUGH_OPERATIONAL.getDatastoreName())) { - throw new UnsupportedOperationException( - "passthrough datastores are currently only supported for event subscriptions"); - } - if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) { - if (subscriptionModelLoaderEnabled) { - persistSubscriptionEvent(subscriptionEvent); - } - if ("subscriptionCreated".equals(cloudEvent.getType())) { - log.info("Subscription for ClientID {} with name {} ...", - subscriptionEvent.getData().getSubscription().getClientID(), - subscriptionEvent.getData().getSubscription().getName()); - if (notificationFeatureEnabled) { - subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType); - } - } - } else { - log.trace("Non-CM subscription event ignored"); - } - } - - private void persistSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { - final YangModelSubscriptionEvent yangModelSubscriptionEvent = - subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent); - subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java deleted file mode 100644 index d3bfe81e82..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import com.hazelcast.map.IMap; -import io.cloudevents.CloudEvent; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; -import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; -import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - - -@Component -@Slf4j -@RequiredArgsConstructor -public class SubscriptionEventForwarder { - - private final InventoryPersistence inventoryPersistence; - private final EventsPublisher eventsPublisher; - private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - private final SubscriptionEventMapper subscriptionEventMapper; - private final SubscriptionEventCloudMapper subscriptionEventCloudMapper; - private final ClientSubscriptionEventMapper clientSubscriptionEventMapper; - private final SubscriptionPersistence subscriptionPersistence; - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") - private String dmiAvcSubscriptionTopicPrefix; - - @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}") - private int dmiResponseTimeoutInMs; - - /** - * Forward subscription event. - * - * @param subscriptionEvent the event to be forwarded - */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) { - final List cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets(); - if (cmHandleTargets == null || cmHandleTargets.isEmpty() - || cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) { - throw new UnsupportedOperationException( - "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); - } - final Collection yangModelCmHandles = - inventoryPersistence.getYangModelCmHandles(cmHandleTargets); - final Map>> dmiPropertiesPerCmHandleIdPerServiceName - = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); - findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName); - } - - private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType, - final List cmHandleTargetsAsStrings, - final Map>> - dmiPropertiesPerCmHandleIdPerServiceName) { - final SubscriptionEventResponse emptySubscriptionEventResponse = - new SubscriptionEventResponse().withData(new Data()); - emptySubscriptionEventResponse.getData().setSubscriptionName( - subscriptionEvent.getData().getSubscription().getName()); - emptySubscriptionEventResponse.getData().setClientId( - subscriptionEvent.getData().getSubscription().getClientID()); - final List cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream() - .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList()); - - final List targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings); - targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb); - - final Set dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); - - if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) { - updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb); - } - if (dmisToRespond.isEmpty()) { - subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse, - "subscriptionCreatedStatus"); - } else { - startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond); - final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent = - clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType); - } - } - - private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse, - final Set dmisToRespond) { - final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId(); - final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName(); - final String subscriptionEventId = subscriptionClientId + subscriptionName; - - forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, - ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); - final ResponseTimeoutTask responseTimeoutTask = - new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome, - emptySubscriptionEventResponse); - - executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); - } - - private void forwardEventToDmis(final Map>> dmiNameCmHandleMap, - final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent - ncmpSubscriptionEvent, final String eventType) { - dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { - final List cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map( - cmHandleAndProperties -> { - final CmHandle cmHandle = new CmHandle(); - cmHandle.setId(cmHandleAndProperties.getKey()); - cmHandle.setAdditionalProperties(cmHandleAndProperties.getValue()); - return cmHandle; - }).collect(Collectors.toList()); - - ncmpSubscriptionEvent.getData().getPredicates().setTargets(cmHandleTargets); - final String eventKey = createEventKey(ncmpSubscriptionEvent, dmiName); - final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; - - final CloudEvent ncmpSubscriptionCloudEvent = - subscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType); - eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent); - }); - } - - private String createEventKey( - final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent subscriptionEvent, - final String dmiName) { - return subscriptionEvent.getData().getSubscription().getClientID() - + "-" - + subscriptionEvent.getData().getSubscription().getName() - + "-" - + dmiName; - } - - private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent( - final SubscriptionEvent subscriptionEvent, - final List targetCmHandlesDoesNotExistInDb) { - final YangModelSubscriptionEvent yangModelSubscriptionEvent = - subscriptionEventMapper.toYangModelSubscriptionEvent(subscriptionEvent); - yangModelSubscriptionEvent.getPredicates() - .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb, - yangModelSubscriptionEvent)); - subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); - } - - private static List findRejectedCmHandles( - final List targetCmHandlesDoesNotExistInDb, - final YangModelSubscriptionEvent yangModelSubscriptionEvent) { - return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream() - .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId())) - .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(), - SubscriptionStatus.REJECTED, "Targets not found")) - .collect(Collectors.toList()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java deleted file mode 100644 index 35d94cc7a2..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import java.util.List; -import java.util.stream.Collectors; -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; - -@Mapper(componentModel = "spring") -public interface SubscriptionEventMapper { - - @Mapping(source = "data.subscription.clientID", target = "clientId") - @Mapping(source = "data.subscription.name", target = "subscriptionName") - @Mapping(source = "data.predicates.targets", target = "predicates.targetCmHandles", - qualifiedByName = "mapTargetsToCmHandleTargets") - @Mapping(source = "data.predicates.datastore", target = "predicates.datastore") - YangModelSubscriptionEvent toYangModelSubscriptionEvent(SubscriptionEvent subscriptionEvent); - - /** - * Maps list of Targets to list of TargetCmHandle. - * - * @param targets list of objects - * @return TargetCmHandle list - */ - @Named("mapTargetsToCmHandleTargets") - default List mapTargetsToCmHandleTargets(List targets) { - return targets.stream().map(YangModelSubscriptionEvent.TargetCmHandle::new) - .collect(Collectors.toList()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java deleted file mode 100644 index b1c0a322dd..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import com.hazelcast.map.IMap; -import io.cloudevents.CloudEvent; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; -import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; -import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; -import org.onap.cps.spi.model.DataNode; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class SubscriptionEventResponseConsumer { - - private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionPersistence subscriptionPersistence; - private final SubscriptionEventResponseMapper subscriptionEventResponseMapper; - private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - private final SubscriptionEventResponseCloudMapper subscriptionEventResponseCloudMapper; - - @Value("${notification.enabled:true}") - private boolean notificationFeatureEnabled; - - @Value("${ncmp.model-loader.subscription:false}") - private boolean subscriptionModelLoaderEnabled; - - /** - * Consume subscription response event. - * - * @param subscriptionEventResponseConsumerRecord the event to be consumed - */ - @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", - containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") - public void consumeSubscriptionEventResponse( - final ConsumerRecord subscriptionEventResponseConsumerRecord) { - final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value(); - final String eventType = subscriptionEventResponseConsumerRecord.value().getType(); - final SubscriptionEventResponse subscriptionEventResponse = - subscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent); - final String clientId = subscriptionEventResponse.getData().getClientId(); - log.info("subscription event response of clientId: {} is received.", clientId); - final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); - final String subscriptionEventId = clientId + subscriptionName; - boolean createOutcomeResponse = false; - if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - dmiNames.remove(subscriptionEventResponse.getData().getDmiName()); - forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, - ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); - createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); - } - if (subscriptionModelLoaderEnabled) { - updateSubscriptionEvent(subscriptionEventResponse); - } - if (createOutcomeResponse - && notificationFeatureEnabled - && hasNoPendingCmHandles(clientId, subscriptionName)) { - subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType); - forwardedSubscriptionEventCache.remove(subscriptionEventId); - } - } - - private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) { - final Collection dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent( - clientId, subscriptionName); - final Map> cmHandleIdToStatusAndDetailsAsMapOriginal = - DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription); - for (final Map statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) { - final String status = statusAndDetailsMap.get("status"); - if (SubscriptionStatus.PENDING.toString().equals(status)) { - return false; - } - } - return true; - } - - private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { - final YangModelSubscriptionEvent yangModelSubscriptionEvent = - subscriptionEventResponseMapper - .toYangModelSubscriptionEvent(subscriptionEventResponse); - subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); - } -} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java deleted file mode 100644 index dc122ee5d1..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import java.util.List; -import java.util.stream.Collectors; -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; - -@Mapper(componentModel = "spring") -public interface SubscriptionEventResponseMapper { - - @Mapping(source = "data.clientId", target = "clientId") - @Mapping(source = "data.subscriptionName", target = "subscriptionName") - @Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles", - qualifiedByName = "mapSubscriptionStatusToCmHandleTargets") - YangModelSubscriptionEvent toYangModelSubscriptionEvent( - SubscriptionEventResponse subscriptionEventResponse); - - /** - * Maps SubscriptionStatus to list of TargetCmHandle. - * - * @param subscriptionStatus as a list - * @return TargetCmHandle list - */ - @Named("mapSubscriptionStatusToCmHandleTargets") - default List mapSubscriptionStatusToCmHandleTargets( - List subscriptionStatus) { - return subscriptionStatus.stream().map(status -> new YangModelSubscriptionEvent.TargetCmHandle(status.getId(), - org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus.fromString(status.getStatus().value()), - status.getDetails())).collect(Collectors.toList()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java deleted file mode 100644 index 822ca55091..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import io.cloudevents.CloudEvent; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.NcmpEventResponseCode; -import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; -import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; -import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -@RequiredArgsConstructor -public class SubscriptionEventResponseOutcome { - - private final SubscriptionPersistence subscriptionPersistence; - - private final EventsPublisher outcomeEventsPublisher; - - private final SubscriptionOutcomeMapper subscriptionOutcomeMapper; - - private final SubscriptionOutcomeCloudMapper subscriptionOutcomeCloudMapper; - - @Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}") - private String subscriptionOutcomeEventTopic; - - /** - * This is for construction of outcome message to be published for client apps. - * - * @param subscriptionEventResponse event produced by Dmi Plugin - */ - public void sendResponse(final SubscriptionEventResponse subscriptionEventResponse, final String eventKey) { - final SubscriptionEventOutcome subscriptionEventOutcome = - formSubscriptionOutcomeMessage(subscriptionEventResponse); - final String subscriptionClientId = subscriptionEventResponse.getData().getClientId(); - final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); - final String subscriptionEventId = subscriptionClientId + subscriptionName; - final CloudEvent subscriptionOutcomeCloudEvent = - subscriptionOutcomeCloudMapper.toCloudEvent(subscriptionEventOutcome, - subscriptionEventId, eventKey); - outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic, - subscriptionEventId, subscriptionOutcomeCloudEvent); - } - - private SubscriptionEventOutcome formSubscriptionOutcomeMessage( - final SubscriptionEventResponse subscriptionEventResponse) { - final Map> cmHandleIdToStatusAndDetailsAsMap = - DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode( - subscriptionPersistence.getCmHandlesForSubscriptionEvent( - subscriptionEventResponse.getData().getClientId(), - subscriptionEventResponse.getData().getSubscriptionName())); - final List - subscriptionStatusList = mapCmHandleIdStatusDetailsMapToSubscriptionStatusList( - cmHandleIdToStatusAndDetailsAsMap); - subscriptionEventResponse.getData().setSubscriptionStatus(subscriptionStatusList); - return fromSubscriptionEventResponse(subscriptionEventResponse, - decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap)); - } - - private static List - mapCmHandleIdStatusDetailsMapToSubscriptionStatusList( - final Map> cmHandleIdToStatusAndDetailsAsMap) { - return cmHandleIdToStatusAndDetailsAsMap.entrySet() - .stream().map(entryset -> { - final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus - subscriptionStatus = new org.onap.cps.ncmp.events.avcsubscription1_0_0 - .dmi_to_ncmp.SubscriptionStatus(); - final String cmHandleId = entryset.getKey(); - final Map statusAndDetailsMap = entryset.getValue(); - final String status = statusAndDetailsMap.get("status"); - final String details = statusAndDetailsMap.get("details"); - subscriptionStatus.setId(cmHandleId); - subscriptionStatus.setStatus( - org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp - .SubscriptionStatus.Status.fromValue(status)); - subscriptionStatus.setDetails(details); - return subscriptionStatus; - }).collect(Collectors.toList()); - } - - private NcmpEventResponseCode decideOnNcmpEventResponseCodeForSubscription( - final Map> cmHandleIdToStatusAndDetailsAsMap) { - - final boolean isAllTargetsPending = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, - SubscriptionStatus.PENDING); - - final boolean isAllTargetsRejected = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, - SubscriptionStatus.REJECTED); - - final boolean isAllTargetsAccepted = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, - SubscriptionStatus.ACCEPTED); - - if (isAllTargetsAccepted) { - return NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION; - } else if (isAllTargetsRejected) { - return NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE; - } else if (isAllTargetsPending) { - return NcmpEventResponseCode.SUBSCRIPTION_PENDING; - } else { - return NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION; - } - } - - private boolean isAllTargetCmHandleStatusMatch( - final Map> cmHandleIdToStatusAndDetailsAsMap, - final SubscriptionStatus subscriptionStatus) { - return cmHandleIdToStatusAndDetailsAsMap.values().stream() - .allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString())); - } - - private SubscriptionEventOutcome fromSubscriptionEventResponse( - final SubscriptionEventResponse subscriptionEventResponse, - final NcmpEventResponseCode ncmpEventResponseCode) { - - final SubscriptionEventOutcome subscriptionEventOutcome = - subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse); - subscriptionEventOutcome.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode())); - subscriptionEventOutcome.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage()); - - return subscriptionEventOutcome; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java deleted file mode 100644 index 7803b982f3..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avcsubscription; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.mapstruct.Mapper; -import org.mapstruct.Mapping; -import org.mapstruct.Named; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfo; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; -import org.onap.cps.spi.exceptions.DataValidationException; - -@Mapper(componentModel = "spring") -public interface SubscriptionOutcomeMapper { - - @Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo", - qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo") - SubscriptionEventOutcome toSubscriptionEventOutcome(SubscriptionEventResponse subscriptionEventResponse); - - /** - * Maps list of SubscriptionStatus to an AdditionalInfo. - * - * @param subscriptionStatusList containing details - * @return an AdditionalInfo - */ - @Named("mapListOfSubscriptionStatusToAdditionalInfo") - default AdditionalInfo mapListOfSubscriptionStatusToAdditionalInfo( - final List subscriptionStatusList) { - if (subscriptionStatusList == null || subscriptionStatusList.isEmpty()) { - throw new DataValidationException("Invalid subscriptionStatusList", - "SubscriptionStatus list cannot be null or empty"); - } - - final Map> rejectedSubscriptionsPerDetails = getSubscriptionsPerDetails( - subscriptionStatusList, SubscriptionStatus.Status.REJECTED); - final Map> rejectedCmHandlesPerDetails = - getCmHandlesPerDetails(rejectedSubscriptionsPerDetails); - final List rejectedCmHandles = getAdditionalInfoDetailList(rejectedCmHandlesPerDetails); - - - final Map> pendingSubscriptionsPerDetails = getSubscriptionsPerDetails( - subscriptionStatusList, SubscriptionStatus.Status.PENDING); - final Map> pendingCmHandlesPerDetails = - getCmHandlesPerDetails(pendingSubscriptionsPerDetails); - final List pendingCmHandles = getAdditionalInfoDetailList(pendingCmHandlesPerDetails); - - final AdditionalInfo additionalInfo = new AdditionalInfo(); - additionalInfo.setRejected(rejectedCmHandles); - additionalInfo.setPending(pendingCmHandles); - - return additionalInfo; - } - - private static Map> getSubscriptionsPerDetails( - final List subscriptionStatusList, final SubscriptionStatus.Status status) { - return subscriptionStatusList.stream() - .filter(subscriptionStatus -> subscriptionStatus.getStatus() == status) - .collect(Collectors.groupingBy(SubscriptionStatus::getDetails)); - } - - private static Map> getCmHandlesPerDetails( - final Map> subscriptionsPerDetails) { - return subscriptionsPerDetails.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().stream() - .map(SubscriptionStatus::getId) - .collect(Collectors.toList()) - )); - } - - private static List getAdditionalInfoDetailList( - final Map> cmHandlesPerDetails) { - return cmHandlesPerDetails.entrySet().stream() - .map(entry -> { - final AdditionalInfoDetail detail = new AdditionalInfoDetail(); - detail.setDetails(entry.getKey()); - detail.setTargets(entry.getValue()); - return detail; - }).collect(Collectors.toList()); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventConsumer.java new file mode 100644 index 0000000000..3a7e0c6cf5 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventConsumer.java @@ -0,0 +1,118 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import com.hazelcast.map.IMap; +import io.cloudevents.CloudEvent; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; +import org.onap.cps.spi.model.DataNode; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class CmSubscriptionDmiOutEventConsumer { + + private final IMap> forwardedSubscriptionEventCache; + private final SubscriptionPersistence subscriptionPersistence; + private final CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper + cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper; + private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher; + private final SubscriptionEventResponseCloudMapper subscriptionEventResponseCloudMapper; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume subscription response event. + * + * @param cmSubscriptionDmiOutConsumerRecord the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void consumeSubscriptionEventResponse( + final ConsumerRecord cmSubscriptionDmiOutConsumerRecord) { + final CloudEvent cloudEvent = cmSubscriptionDmiOutConsumerRecord.value(); + final String eventType = cmSubscriptionDmiOutConsumerRecord.value().getType(); + final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent = + subscriptionEventResponseCloudMapper.toCmSubscriptionDmiOutEvent(cloudEvent); + final String clientId = cmSubscriptionDmiOutEvent.getData().getClientId(); + log.info("subscription event response of clientId: {} is received.", clientId); + final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName(); + final String subscriptionEventId = clientId + subscriptionName; + boolean createOutcomeResponse = false; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + dmiNames.remove(cmSubscriptionDmiOutEvent.getData().getDmiName()); + forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + } + if (subscriptionModelLoaderEnabled) { + updateSubscriptionEvent(cmSubscriptionDmiOutEvent); + } + if (createOutcomeResponse + && notificationFeatureEnabled + && hasNoPendingCmHandles(clientId, subscriptionName)) { + cmSubscriptionNcmpOutEventPublisher.sendResponse(cmSubscriptionDmiOutEvent, eventType); + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } + + private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) { + final Collection dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent( + clientId, subscriptionName); + final Map> cmHandleIdToStatusAndDetailsAsMapOriginal = + DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription); + for (final Map statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) { + final String status = statusAndDetailsMap.get("status"); + if (SubscriptionStatus.PENDING.toString().equals(status)) { + return false; + } + } + return true; + } + + private void updateSubscriptionEvent(final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) { + final YangModelSubscriptionEvent yangModelSubscriptionEvent = + cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper + .toYangModelSubscriptionEvent(cmSubscriptionDmiOutEvent); + subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.java new file mode 100644 index 0000000000..99452c6c24 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.java @@ -0,0 +1,105 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.AdditionalInfo; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent; +import org.onap.cps.spi.exceptions.DataValidationException; + +@Mapper(componentModel = "spring") +public interface CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper { + + @Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo", + qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo") + CmSubscriptionNcmpOutEvent toCmSubscriptionNcmpOutEvent(CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent); + + /** + * Maps list of SubscriptionStatus to an AdditionalInfo. + * + * @param subscriptionStatusList containing details + * @return an AdditionalInfo + */ + @Named("mapListOfSubscriptionStatusToAdditionalInfo") + default AdditionalInfo mapListOfSubscriptionStatusToAdditionalInfo( + final List subscriptionStatusList) { + if (subscriptionStatusList == null || subscriptionStatusList.isEmpty()) { + throw new DataValidationException("Invalid subscriptionStatusList", + "SubscriptionStatus list cannot be null or empty"); + } + + final Map> rejectedSubscriptionsPerDetails = getSubscriptionsPerDetails( + subscriptionStatusList, SubscriptionStatus.Status.REJECTED); + final Map> rejectedCmHandlesPerDetails = + getCmHandlesPerDetails(rejectedSubscriptionsPerDetails); + final List rejectedCmHandles = getAdditionalInfoDetailList(rejectedCmHandlesPerDetails); + + + final Map> pendingSubscriptionsPerDetails = getSubscriptionsPerDetails( + subscriptionStatusList, SubscriptionStatus.Status.PENDING); + final Map> pendingCmHandlesPerDetails = + getCmHandlesPerDetails(pendingSubscriptionsPerDetails); + final List pendingCmHandles = getAdditionalInfoDetailList(pendingCmHandlesPerDetails); + + final AdditionalInfo additionalInfo = new AdditionalInfo(); + additionalInfo.setRejected(rejectedCmHandles); + additionalInfo.setPending(pendingCmHandles); + + return additionalInfo; + } + + private static Map> getSubscriptionsPerDetails( + final List subscriptionStatusList, final SubscriptionStatus.Status status) { + return subscriptionStatusList.stream() + .filter(subscriptionStatus -> subscriptionStatus.getStatus() == status) + .collect(Collectors.groupingBy(SubscriptionStatus::getDetails)); + } + + private static Map> getCmHandlesPerDetails( + final Map> subscriptionsPerDetails) { + return subscriptionsPerDetails.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().stream() + .map(SubscriptionStatus::getId) + .collect(Collectors.toList()) + )); + } + + private static List getAdditionalInfoDetailList( + final Map> cmHandlesPerDetails) { + return cmHandlesPerDetails.entrySet().stream() + .map(entry -> { + final AdditionalInfoDetail detail = new AdditionalInfoDetail(); + detail.setDetails(entry.getKey()); + detail.setTargets(entry.getValue()); + return detail; + }).collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper.java new file mode 100644 index 0000000000..77eebe36f5 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import java.util.List; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; + +@Mapper(componentModel = "spring") +public interface CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper { + + @Mapping(source = "data.clientId", target = "clientId") + @Mapping(source = "data.subscriptionName", target = "subscriptionName") + @Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles", + qualifiedByName = "mapSubscriptionStatusToCmHandleTargets") + YangModelSubscriptionEvent toYangModelSubscriptionEvent( + CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent); + + /** + * Maps SubscriptionStatus to list of TargetCmHandle. + * + * @param subscriptionStatus as a list + * @return TargetCmHandle list + */ + @Named("mapSubscriptionStatusToCmHandleTargets") + default List mapSubscriptionStatusToCmHandleTargets( + List subscriptionStatus) { + return subscriptionStatus.stream().map(status -> new YangModelSubscriptionEvent.TargetCmHandle(status.getId(), + org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus.fromString(status.getStatus().value()), + status.getDetails())).collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java new file mode 100644 index 0000000000..c64ebacb01 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventConsumer.java @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022-2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL; +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING; + +import io.cloudevents.CloudEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.utils.CmSubscriptionEventCloudMapper; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class CmSubscriptionNcmpInEventConsumer { + + private final CmSubscriptionNcmpInEventForwarder cmSubscriptionNcmpInEventForwarder; + private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper; + private final SubscriptionPersistence subscriptionPersistence; + private final CmSubscriptionEventCloudMapper cmSubscriptionEventCloudMapper; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + + @Value("${ncmp.model-loader.subscription:false}") + private boolean subscriptionModelLoaderEnabled; + + /** + * Consume the specified event. + * + * @param subscriptionEventConsumerRecord the event to be consumed + */ + @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void consumeSubscriptionEvent(final ConsumerRecord subscriptionEventConsumerRecord) { + final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); + final String eventType = subscriptionEventConsumerRecord.value().getType(); + final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent = + cmSubscriptionEventCloudMapper.toCmSubscriptionNcmpInEvent(cloudEvent); + final String eventDatastore = cmSubscriptionNcmpInEvent.getData().getPredicates().getDatastore(); + if (!eventDatastore.equals(PASSTHROUGH_RUNNING.getDatastoreName()) || eventDatastore.equals( + PASSTHROUGH_OPERATIONAL.getDatastoreName())) { + throw new UnsupportedOperationException( + "passthrough datastores are currently only supported for event subscriptions"); + } + if ("CM".equals(cmSubscriptionNcmpInEvent.getData().getDataType().getDataCategory())) { + if (subscriptionModelLoaderEnabled) { + persistSubscriptionEvent(cmSubscriptionNcmpInEvent); + } + if ("subscriptionCreated".equals(cloudEvent.getType())) { + log.info("Subscription for ClientID {} with name {} ...", + cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID(), + cmSubscriptionNcmpInEvent.getData().getSubscription().getName()); + if (notificationFeatureEnabled) { + cmSubscriptionNcmpInEventForwarder.forwardCreateSubscriptionEvent(cmSubscriptionNcmpInEvent, + eventType); + } + } + } else { + log.trace("Non-CM subscription event ignored"); + } + } + + private void persistSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent) { + final YangModelSubscriptionEvent yangModelSubscriptionEvent = + cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent); + subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventForwarder.java new file mode 100644 index 0000000000..4a174954ea --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventForwarder.java @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import com.hazelcast.map.IMap; +import io.cloudevents.CloudEvent; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; +import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.impl.utils.CmSubscriptionEventCloudMapper; +import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class CmSubscriptionNcmpInEventForwarder { + + private final InventoryPersistence inventoryPersistence; + private final EventsPublisher eventsPublisher; + private final IMap> forwardedSubscriptionEventCache; + private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher; + private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper; + private final CmSubscriptionEventCloudMapper cmSubscriptionEventCloudMapper; + private final CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper + cmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper; + private final SubscriptionPersistence subscriptionPersistence; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") + private String dmiAvcSubscriptionTopicPrefix; + + @Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms:30000}") + private int dmiResponseTimeoutInMs; + + /** + * Forward subscription event. + * + * @param cmSubscriptionNcmpInEvent the event to be forwarded + */ + public void forwardCreateSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent, + final String eventType) { + final List cmHandleTargets = cmSubscriptionNcmpInEvent.getData().getPredicates().getTargets(); + if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream() + .anyMatch(id -> (id).contains("*"))) { + throw new UnsupportedOperationException( + "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); + } + final Collection yangModelCmHandles = + inventoryPersistence.getYangModelCmHandles(cmHandleTargets); + final Map>> dmiPropertiesPerCmHandleIdPerServiceName = + DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); + findDmisAndRespond(cmSubscriptionNcmpInEvent, eventType, cmHandleTargets, + dmiPropertiesPerCmHandleIdPerServiceName); + } + + private void findDmisAndRespond(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent, final String eventType, + final List cmHandleTargetsAsStrings, + final Map>> dmiPropertiesPerCmHandleIdPerServiceName) { + final CmSubscriptionDmiOutEvent emptyCmSubscriptionDmiOutEvent = + new CmSubscriptionDmiOutEvent().withData(new Data()); + emptyCmSubscriptionDmiOutEvent.getData() + .setSubscriptionName(cmSubscriptionNcmpInEvent.getData().getSubscription().getName()); + emptyCmSubscriptionDmiOutEvent.getData() + .setClientId(cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID()); + final List cmHandlesThatExistsInDb = + dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream().map(Map.Entry::getValue).map(Map::keySet) + .flatMap(Set::stream).collect(Collectors.toList()); + + final List targetCmHandlesDoesNotExistInDb = new ArrayList<>(cmHandleTargetsAsStrings); + targetCmHandlesDoesNotExistInDb.removeAll(cmHandlesThatExistsInDb); + + final Set dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); + + if (dmisToRespond.isEmpty() || !targetCmHandlesDoesNotExistInDb.isEmpty()) { + updatesCmHandlesToRejectedAndPersistSubscriptionEvent(cmSubscriptionNcmpInEvent, + targetCmHandlesDoesNotExistInDb); + } + if (dmisToRespond.isEmpty()) { + cmSubscriptionNcmpOutEventPublisher.sendResponse(emptyCmSubscriptionDmiOutEvent, + "subscriptionCreatedStatus"); + } else { + startResponseTimeout(emptyCmSubscriptionDmiOutEvent, dmisToRespond); + final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent = + cmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.toCmSubscriptionDmiInEvent( + cmSubscriptionNcmpInEvent); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, cmSubscriptionDmiInEvent, eventType); + } + } + + private void startResponseTimeout(final CmSubscriptionDmiOutEvent emptyCmSubscriptionDmiOutEvent, + final Set dmisToRespond) { + final String subscriptionClientId = emptyCmSubscriptionDmiOutEvent.getData().getClientId(); + final String subscriptionName = emptyCmSubscriptionDmiOutEvent.getData().getSubscriptionName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, cmSubscriptionNcmpOutEventPublisher, + emptyCmSubscriptionDmiOutEvent); + + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } + + private void forwardEventToDmis(final Map>> dmiNameCmHandleMap, + final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String eventType) { + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + final List cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map( + cmHandleAndProperties -> { + final CmHandle cmHandle = new CmHandle(); + cmHandle.setId(cmHandleAndProperties.getKey()); + cmHandle.setAdditionalProperties(cmHandleAndProperties.getValue()); + return cmHandle; + }).collect(Collectors.toList()); + + cmSubscriptionDmiInEvent.getData().getPredicates().setTargets(cmHandleTargets); + final String eventKey = createEventKey(cmSubscriptionDmiInEvent, dmiName); + final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; + + final CloudEvent cmSubscriptionDmiInCloudEvent = + cmSubscriptionEventCloudMapper.toCloudEvent(cmSubscriptionDmiInEvent, eventKey, eventType); + eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, cmSubscriptionDmiInCloudEvent); + }); + } + + private String createEventKey(final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String dmiName) { + return cmSubscriptionDmiInEvent.getData().getSubscription().getClientID() + "-" + + cmSubscriptionDmiInEvent.getData().getSubscription().getName() + "-" + dmiName; + } + + private void updatesCmHandlesToRejectedAndPersistSubscriptionEvent( + final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent, + final List targetCmHandlesDoesNotExistInDb) { + final YangModelSubscriptionEvent yangModelSubscriptionEvent = + cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent); + yangModelSubscriptionEvent.getPredicates() + .setTargetCmHandles(findRejectedCmHandles(targetCmHandlesDoesNotExistInDb, yangModelSubscriptionEvent)); + subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent); + } + + private static List findRejectedCmHandles( + final List targetCmHandlesDoesNotExistInDb, + final YangModelSubscriptionEvent yangModelSubscriptionEvent) { + return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream() + .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId())) + .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(), + SubscriptionStatus.REJECTED, "Targets not found")) + .collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventMapper.java new file mode 100644 index 0000000000..ab93f13a2c --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventMapper.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import java.util.List; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; + +@Mapper(componentModel = "spring") +public interface CmSubscriptionNcmpInEventMapper { + + @Mapping(source = "data.subscription.clientID", target = "clientId") + @Mapping(source = "data.subscription.name", target = "subscriptionName") + @Mapping(source = "data.predicates.targets", target = "predicates.targetCmHandles", + qualifiedByName = "mapTargetsToCmHandleTargets") + @Mapping(source = "data.predicates.datastore", target = "predicates.datastore") + YangModelSubscriptionEvent toYangModelSubscriptionEvent(CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent); + + /** + * Maps list of Targets to list of TargetCmHandle. + * + * @param targets list of objects + * @return TargetCmHandle list + */ + @Named("mapTargetsToCmHandleTargets") + default List mapTargetsToCmHandleTargets(List targets) { + return targets.stream().map(YangModelSubscriptionEvent.TargetCmHandle::new) + .collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.java new file mode 100644 index 0000000000..f1c1664537 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper.java @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent; + +@Mapper(componentModel = "spring") +public interface CmSubscriptionNcmpInEventToCmSubscriptionDmiInEventMapper { + + @Mapping(target = "data.predicates.targets", ignore = true) + CmSubscriptionDmiInEvent toCmSubscriptionDmiInEvent( + CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent); + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpOutEventPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpOutEventPublisher.java new file mode 100644 index 0000000000..38cc724be0 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmSubscriptionNcmpOutEventPublisher.java @@ -0,0 +1,153 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import io.cloudevents.CloudEvent; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.NcmpEventResponseCode; +import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class CmSubscriptionNcmpOutEventPublisher { + + private final SubscriptionPersistence subscriptionPersistence; + + private final EventsPublisher outcomeEventsPublisher; + + private final CmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper + cmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper; + + private final SubscriptionOutcomeCloudMapper subscriptionOutcomeCloudMapper; + + @Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}") + private String subscriptionOutcomeEventTopic; + + /** + * This is for construction of outcome message to be published for client apps. + * + * @param cmSubscriptionDmiOutEvent event produced by Dmi Plugin + */ + public void sendResponse(final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent, final String eventKey) { + final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent = + formCmSubscriptionNcmpOutEvent(cmSubscriptionDmiOutEvent); + final String subscriptionClientId = cmSubscriptionDmiOutEvent.getData().getClientId(); + final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + final CloudEvent subscriptionOutcomeCloudEvent = + subscriptionOutcomeCloudMapper.toCloudEvent(cmSubscriptionNcmpOutEvent, + subscriptionEventId, eventKey); + outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic, + subscriptionEventId, subscriptionOutcomeCloudEvent); + } + + private CmSubscriptionNcmpOutEvent formCmSubscriptionNcmpOutEvent( + final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) { + final Map> cmHandleIdToStatusAndDetailsAsMap = + DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode( + subscriptionPersistence.getCmHandlesForSubscriptionEvent( + cmSubscriptionDmiOutEvent.getData().getClientId(), + cmSubscriptionDmiOutEvent.getData().getSubscriptionName())); + final List + subscriptionStatusList = + mapCmHandleIdStatusDetailsMapToSubscriptionStatusList(cmHandleIdToStatusAndDetailsAsMap); + cmSubscriptionDmiOutEvent.getData().setSubscriptionStatus(subscriptionStatusList); + return fromDmiOutEvent(cmSubscriptionDmiOutEvent, + decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap)); + } + + private static List + mapCmHandleIdStatusDetailsMapToSubscriptionStatusList( + final Map> cmHandleIdToStatusAndDetailsAsMap) { + return cmHandleIdToStatusAndDetailsAsMap.entrySet() + .stream().map(entryset -> { + final org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus + subscriptionStatus = new org.onap.cps.ncmp.events.cmsubscription1_0_0 + .dmi_to_ncmp.SubscriptionStatus(); + final String cmHandleId = entryset.getKey(); + final Map statusAndDetailsMap = entryset.getValue(); + final String status = statusAndDetailsMap.get("status"); + final String details = statusAndDetailsMap.get("details"); + subscriptionStatus.setId(cmHandleId); + subscriptionStatus.setStatus( + org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp + .SubscriptionStatus.Status.fromValue(status)); + subscriptionStatus.setDetails(details); + return subscriptionStatus; + }).collect(Collectors.toList()); + } + + private NcmpEventResponseCode decideOnNcmpEventResponseCodeForSubscription( + final Map> cmHandleIdToStatusAndDetailsAsMap) { + + final boolean isAllTargetsPending = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.PENDING); + + final boolean isAllTargetsRejected = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.REJECTED); + + final boolean isAllTargetsAccepted = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.ACCEPTED); + + if (isAllTargetsAccepted) { + return NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION; + } else if (isAllTargetsRejected) { + return NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE; + } else if (isAllTargetsPending) { + return NcmpEventResponseCode.SUBSCRIPTION_PENDING; + } else { + return NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION; + } + } + + private boolean isAllTargetCmHandleStatusMatch( + final Map> cmHandleIdToStatusAndDetailsAsMap, + final SubscriptionStatus subscriptionStatus) { + return cmHandleIdToStatusAndDetailsAsMap.values().stream() + .allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString())); + } + + private CmSubscriptionNcmpOutEvent fromDmiOutEvent( + final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent, + final NcmpEventResponseCode ncmpEventResponseCode) { + + final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent = + cmSubscriptionDmiOutEventToCmSubscriptionNcmpOutEventMapper.toCmSubscriptionNcmpOutEvent( + cmSubscriptionDmiOutEvent); + cmSubscriptionNcmpOutEvent.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode())); + cmSubscriptionNcmpOutEvent.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage()); + + return cmSubscriptionNcmpOutEvent; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/ResponseTimeoutTask.java new file mode 100644 index 0000000000..7f8cbf676e --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/ResponseTimeoutTask.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.cmsubscription; + +import com.hazelcast.map.IMap; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; + +@Slf4j +@RequiredArgsConstructor +public class ResponseTimeoutTask implements Runnable { + + private final IMap> forwardedSubscriptionEventCache; + private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher; + private final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent; + + @Override + public void run() { + generateTimeoutResponse(); + } + + private void generateTimeoutResponse() { + final String subscriptionClientId = cmSubscriptionDmiOutEvent.getData().getClientId(); + final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { + cmSubscriptionNcmpOutEventPublisher.sendResponse(cmSubscriptionDmiOutEvent, + "subscriptionCreatedStatus"); + forwardedSubscriptionEventCache.remove(subscriptionEventId); + } + } +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/CmSubscriptionEventCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/CmSubscriptionEventCloudMapper.java new file mode 100644 index 0000000000..5bc38e1dcd --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/CmSubscriptionEventCloudMapper.java @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.CloudEventUtils; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.PojoCloudEventData; +import io.cloudevents.jackson.PojoCloudEventDataMapper; +import java.net.URI; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class CmSubscriptionEventCloudMapper { + + private final ObjectMapper objectMapper; + + private static String randomId = UUID.randomUUID().toString(); + + /** + * Maps CloudEvent object to CmSubscriptionNcmpInEvent. + * + * @param cloudEvent object. + * @return CmSubscriptionNcmpInEvent deserialized. + */ + public CmSubscriptionNcmpInEvent toCmSubscriptionNcmpInEvent(final CloudEvent cloudEvent) { + final PojoCloudEventData deserializedCloudEvent = CloudEventUtils + .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, CmSubscriptionNcmpInEvent.class)); + if (deserializedCloudEvent == null) { + log.debug("No data found in the consumed event"); + return null; + } else { + final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent = deserializedCloudEvent.getValue(); + log.debug("Consuming event {}", cmSubscriptionNcmpInEvent); + return cmSubscriptionNcmpInEvent; + } + } + + /** + * Maps CmSubscriptionDmiInEvent to a CloudEvent. + * + * @param cmSubscriptionDmiInEvent object. + * @param eventKey as String. + * @return CloudEvent built. + */ + public CloudEvent toCloudEvent(final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent, final String eventKey, + final String eventType) { + try { + return CloudEventBuilder.v1().withId(randomId) + .withSource(URI.create(cmSubscriptionDmiInEvent.getData().getSubscription().getClientID())) + .withType(eventType).withExtension("correlationid", eventKey) + .withDataSchema(URI.create("urn:cps:" + CmSubscriptionDmiInEvent.class.getName() + ":1.0.0")) + .withData(objectMapper.writeValueAsBytes(cmSubscriptionDmiInEvent)).build(); + } catch (final JsonProcessingException jsonProcessingException) { + log.error("The Cloud Event could not be constructed", jsonProcessingException); + } + return null; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java deleted file mode 100644 index 1561edc448..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.utils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.cloudevents.CloudEvent; -import io.cloudevents.core.CloudEventUtils; -import io.cloudevents.core.builder.CloudEventBuilder; -import io.cloudevents.core.data.PojoCloudEventData; -import io.cloudevents.jackson.PojoCloudEventDataMapper; -import java.net.URI; -import java.util.UUID; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@RequiredArgsConstructor -public class SubscriptionEventCloudMapper { - - private final ObjectMapper objectMapper; - - private static String randomId = UUID.randomUUID().toString(); - - /** - * Maps CloudEvent object to SubscriptionEvent. - * - * @param cloudEvent object. - * @return SubscriptionEvent deserialized. - */ - public SubscriptionEvent toSubscriptionEvent(final CloudEvent cloudEvent) { - final PojoCloudEventData deserializedCloudEvent = CloudEventUtils - .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEvent.class)); - if (deserializedCloudEvent == null) { - log.debug("No data found in the consumed event"); - return null; - } else { - final SubscriptionEvent subscriptionEvent = deserializedCloudEvent.getValue(); - log.debug("Consuming event {}", subscriptionEvent); - return subscriptionEvent; - } - } - - /** - * Maps SubscriptionEvent to a CloudEvent. - * - * @param ncmpSubscriptionEvent object. - * @param eventKey as String. - * @return CloudEvent built. - */ - public CloudEvent toCloudEvent( - final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent, - final String eventKey, final String eventType) { - try { - return CloudEventBuilder.v1() - .withId(randomId) - .withSource(URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID())) - .withType(eventType) - .withExtension("correlationid", eventKey) - .withDataSchema(URI.create("urn:cps:" - + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi - .SubscriptionEvent.class.getName() + ":1.0.0")) - .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)).build(); - } catch (final JsonProcessingException jsonProcessingException) { - log.error("The Cloud Event could not be constructed", jsonProcessingException); - } - return null; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java index e00bb16b93..0721d1d569 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java @@ -27,7 +27,7 @@ import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.jackson.PojoCloudEventDataMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent; import org.springframework.stereotype.Component; @Slf4j @@ -38,21 +38,21 @@ public class SubscriptionEventResponseCloudMapper { private final ObjectMapper objectMapper; /** - * Maps CloudEvent object to SubscriptionEventResponse. + * Maps CloudEvent object to CmSubscriptionDmiOutEvent. * * @param cloudEvent object - * @return SubscriptionEventResponse deserialized + * @return CmSubscriptionDmiOutEvent deserialized */ - public SubscriptionEventResponse toSubscriptionEventResponse(final CloudEvent cloudEvent) { - final PojoCloudEventData deserializedCloudEvent = CloudEventUtils - .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEventResponse.class)); + public CmSubscriptionDmiOutEvent toCmSubscriptionDmiOutEvent(final CloudEvent cloudEvent) { + final PojoCloudEventData deserializedCloudEvent = CloudEventUtils + .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, CmSubscriptionDmiOutEvent.class)); if (deserializedCloudEvent == null) { log.debug("No data found in the consumed subscription response event"); return null; } else { - final SubscriptionEventResponse subscriptionEventResponse = deserializedCloudEvent.getValue(); - log.debug("Consuming subscription response event {}", subscriptionEventResponse); - return subscriptionEventResponse; + final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent = deserializedCloudEvent.getValue(); + log.debug("Consuming subscription response event {}", cmSubscriptionDmiOutEvent); + return cmSubscriptionDmiOutEvent; } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java index 9ea4487063..af629a6bd1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java @@ -28,7 +28,7 @@ import java.net.URI; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; +import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_client.CmSubscriptionNcmpOutEvent; import org.springframework.stereotype.Component; @Slf4j @@ -41,12 +41,12 @@ public class SubscriptionOutcomeCloudMapper { private static String randomId = UUID.randomUUID().toString(); /** - * Maps SubscriptionEventOutcome to a CloudEvent. + * Maps CmSubscriptionNcmpOutEvent to a CloudEvent. * - * @param subscriptionEventOutcome object + * @param cmSubscriptionNcmpOutEvent object * @return CloudEvent */ - public CloudEvent toCloudEvent(final SubscriptionEventOutcome subscriptionEventOutcome, + public CloudEvent toCloudEvent(final CmSubscriptionNcmpOutEvent cmSubscriptionNcmpOutEvent, final String eventKey, final String eventType) { try { return CloudEventBuilder.v1() @@ -54,8 +54,8 @@ public class SubscriptionOutcomeCloudMapper { .withSource(URI.create("NCMP")) .withType(eventType) .withExtension("correlationid", eventKey) - .withDataSchema(URI.create("urn:cps:" + SubscriptionEventOutcome.class.getName() + ":1.0.0")) - .withData(objectMapper.writeValueAsBytes(subscriptionEventOutcome)).build(); + .withDataSchema(URI.create("urn:cps:" + CmSubscriptionNcmpOutEvent.class.getName() + ":1.0.0")) + .withData(objectMapper.writeValueAsBytes(cmSubscriptionNcmpOutEvent)).build(); } catch (final JsonProcessingException jsonProcessingException) { log.error("The Cloud Event could not be constructed", jsonProcessingException); } -- cgit 1.2.3-korg