From 900261c1935dba87179d2e4aa36cede4826186ca Mon Sep 17 00:00:00 2001 From: "halil.cakal" Date: Mon, 8 May 2023 14:18:26 +0100 Subject: Subscription Create Event Outcome Kafka Part - Add subscription event outcome schema with java type for pojos - Add subscription event outcome json for testing - Add mapper to convert subscription response to event outcome - Add a bean to handle subscription response outcome tasks - Change response consumer to publish outcome for client app - Change response timeout task to publish outcome for client app - Change subscription persistance to read datanodes - Add helper to extract cm handle to status mapping from data nodes event - Fix code smells Issue-ID: CPS-1507 Change-Id: I70195073490f456f014e53c1f59d1b6761d18cd4 Signed-off-by: halil.cakal --- .../avc/ForwardedSubscriptionEventCacheConfig.java | 4 +- .../api/impl/event/avc/ResponseTimeoutTask.java | 28 +++-- .../avc/SubscriptionEventResponseConsumer.java | 43 ++++--- .../impl/event/avc/SubscriptionOutcomeMapper.java | 88 ++++++++++++++ .../cps/ncmp/api/impl/events/EventsPublisher.java | 1 + .../ncmp/api/impl/events/avc/AvcEventConsumer.java | 7 +- .../avcsubscription/SubscriptionEventConsumer.java | 10 +- .../SubscriptionEventForwarder.java | 51 ++++---- .../SubscriptionEventResponseOutcome.java | 135 +++++++++++++++++++++ .../subscriptions/SubscriptionPersistence.java | 8 ++ .../subscriptions/SubscriptionPersistenceImpl.java | 29 +++-- .../cps/ncmp/api/impl/utils/DataNodeHelper.java | 75 ++++++++++++ 12 files changed, 410 insertions(+), 69 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java (limited to 'cps-ncmp-service/src/main/java/org/onap') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java index 443ebc627a..d2f16a71d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ForwardedSubscriptionEventCacheConfig.java @@ -33,8 +33,10 @@ import org.springframework.context.annotation.Configuration; @Configuration public class ForwardedSubscriptionEventCacheConfig extends HazelcastCacheConfig { + public static final int SUBSCRIPTION_FORWARD_STARTED_TTL_SECS = 600; + private static final MapConfig forwardedSubscriptionEventCacheMapConfig = - createMapConfig("forwardedSubscriptionEventCacheMapConfig"); + createMapConfig("forwardedSubscriptionEventCacheMapConfig"); /** * Distributed instance of forwarded subscription information cache that contains subscription event diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java index e7edecfacc..9c7b79f733 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/ResponseTimeoutTask.java @@ -24,27 +24,35 @@ import com.hazelcast.map.IMap; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; @Slf4j @RequiredArgsConstructor public class ResponseTimeoutTask implements Runnable { private final IMap> forwardedSubscriptionEventCache; - private final String subscriptionEventId; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; + private final String subscriptionClientId; + private final String subscriptionName; @Override public void run() { + + try { + generateAndSendResponse(); + } catch (final Exception exception) { + log.info("Caught exception in Runnable for ResponseTimeoutTask. StackTrace: {}", + exception.toString()); + } + + } + + private void generateAndSendResponse() { + final String subscriptionEventId = subscriptionClientId + subscriptionName; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - if (dmiNames.isEmpty()) { - //TODO full outcome response here - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", - subscriptionEventId); - } else { - //TODO partial outcome response here - log.info("placeholder to create partial outcome response for subscriptionEventId: {}.", - subscriptionEventId); - } + subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName, + dmiNames.isEmpty()); forwardedSubscriptionEventCache.remove(subscriptionEventId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java index c173862476..eb3daeb4da 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventResponseConsumer.java @@ -22,9 +22,12 @@ package org.onap.cps.ncmp.api.impl.event.avc; import com.hazelcast.map.IMap; import java.util.Set; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseMapper; +import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventResponseOutcome; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; @@ -38,13 +41,9 @@ import org.springframework.stereotype.Component; public class SubscriptionEventResponseConsumer { private final IMap> forwardedSubscriptionEventCache; - private final SubscriptionPersistence subscriptionPersistence; - private final SubscriptionEventResponseMapper subscriptionEventResponseMapper; - - @Value("${app.ncmp.avc.subscription-outcome-topic}") - private String subscriptionOutcomeEventTopic; + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; @Value("${notification.enabled:true}") private boolean notificationFeatureEnabled; @@ -55,30 +54,36 @@ public class SubscriptionEventResponseConsumer { /** * Consume subscription response event. * - * @param subscriptionEventResponse the event to be consumed + * @param subscriptionEventResponseConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) - public void consumeSubscriptionEventResponse(final SubscriptionEventResponse subscriptionEventResponse) { - log.info("subscription event response of clientId: {} is received.", subscriptionEventResponse.getClientId()); - final String subscriptionEventId = subscriptionEventResponse.getClientId() - + subscriptionEventResponse.getSubscriptionName(); - final boolean createOutcomeResponse; + public void consumeSubscriptionEventResponse( + final ConsumerRecord subscriptionEventResponseConsumerRecord) { + final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value(); + final String clientId = subscriptionEventResponse.getClientId(); + log.info("subscription event response of clientId: {} is received.", clientId); + final String subscriptionName = subscriptionEventResponse.getSubscriptionName(); + final String subscriptionEventId = clientId + subscriptionName; + boolean isFullOutcomeResponse = false; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - forwardedSubscriptionEventCache.get(subscriptionEventId).remove(subscriptionEventResponse.getDmiName()); - createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); - if (createOutcomeResponse) { + final Set dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); + + dmiNames.remove(subscriptionEventResponse.getDmiName()); + forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + isFullOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); + + if (isFullOutcomeResponse) { forwardedSubscriptionEventCache.remove(subscriptionEventId); } - } else { - createOutcomeResponse = true; } if (subscriptionModelLoaderEnabled) { updateSubscriptionEvent(subscriptionEventResponse); } - if (createOutcomeResponse && notificationFeatureEnabled) { - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", subscriptionEventId); - //TODO Create outcome response + if (isFullOutcomeResponse && notificationFeatureEnabled) { + subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName, + isFullOutcomeResponse); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java new file mode 100644 index 0000000000..2466bc36ec --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionOutcomeMapper.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; + +@Mapper(componentModel = "spring") +public interface SubscriptionOutcomeMapper { + + @Mapping(source = "clientId", target = "event.subscription.clientID") + @Mapping(source = "subscriptionName", target = "event.subscription.name") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets", + qualifiedByName = "mapStatusToCmHandleRejected") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets", + qualifiedByName = "mapStatusToCmHandleAccepted") + @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets", + qualifiedByName = "mapStatusToCmHandlePending") + SubscriptionEventOutcome toSubscriptionEventOutcome( + SubscriptionEventResponse subscriptionEventResponse); + + /** + * Maps StatusToCMHandle to list of TargetCmHandle rejected. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleRejected") + default List mapStatusToCmHandleRejected(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle accepted. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandleAccepted") + default List mapStatusToCmHandleAccepted(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } + + /** + * Maps StatusToCMHandle to list of TargetCmHandle pending. + * + * @param targets as a map + * @return TargetCmHandle list + */ + @Named("mapStatusToCmHandlePending") + default List mapStatusToCmHandlePending(Map targets) { + return targets.entrySet() + .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue())) + .map(target -> target.getKey()) + .collect(Collectors.toList()); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index b0b091a2f6..d92316dc58 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -50,6 +50,7 @@ public class EventsPublisher { * @param topicName valid topic name * @param eventKey message key * @param event message payload + * @deprecated This method is not needed anymore since the use of headers will be in place. */ @Deprecated public void publishEvent(final String topicName, final String eventKey, final T event) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index 3bf02f0b58..f37497abe6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -68,12 +68,13 @@ public class AvcEventConsumer { } private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { + final String eventId = "eventId"; final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader("eventId").value()); - eventHeaders.remove("eventId"); + (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); + eventHeaders.remove(eventId); log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, mutatedEventId); - eventHeaders.add(new RecordHeader("eventId", SerializationUtils.serialize(mutatedEventId))); + eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java index 7717db67a6..88b41d0075 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; @@ -50,11 +51,13 @@ public class SubscriptionEventConsumer { /** * Consume the specified event. * - * @param subscriptionEvent the event to be consumed + * @param subscriptionEventConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) - public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void consumeSubscriptionEvent( + final ConsumerRecord subscriptionEventConsumerRecord) { + final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value(); final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); final String eventDatastore = event.getPredicates().getDatastore(); if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { @@ -71,7 +74,8 @@ public class SubscriptionEventConsumer { event.getSubscription().getClientID(), event.getSubscription().getName()); if (notificationFeatureEnabled) { - subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, + subscriptionEventConsumerRecord.headers()); } } } else { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 4654b148c6..19a0f12b0b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; +import org.onap.cps.ncmp.api.impl.event.avc.ForwardedSubscriptionEventCacheConfig; import org.onap.cps.ncmp.api.impl.event.avc.ResponseTimeoutTask; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; @@ -53,9 +55,8 @@ public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; private final EventsPublisher eventsPublisher; private final IMap> forwardedSubscriptionEventCache; - + private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") private String dmiAvcSubscriptionTopicPrefix; @@ -67,7 +68,8 @@ public class SubscriptionEventForwarder { * * @param subscriptionEvent the event to be forwarded */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { final List cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { @@ -84,36 +86,44 @@ public class SubscriptionEventForwarder { final Set dmisToRespond = new HashSet<>(dmiPropertiesPerCmHandleIdPerServiceName.keySet()); if (dmisToRespond.isEmpty()) { - log.info("placeholder to create full outcome response for subscriptionEventId: {}.", - subscriptionEvent.getEvent().getSubscription().getClientID() - + subscriptionEvent.getEvent().getSubscription().getName()); - //TODO outcome response with no cmhandles + final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName, true); } else { startResponseTimeout(subscriptionEvent, dmisToRespond); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders); + } + } + + private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set dmisToRespond) { + final String subscriptionClientId = subscriptionEvent.getEvent().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + + forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, + ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); + final ResponseTimeoutTask responseTimeoutTask = + new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome, + subscriptionClientId, subscriptionName); + try { + executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); + } catch (final RuntimeException ex) { + log.info("Caught exception in ScheduledExecutorService for ResponseTimeoutTask. StackTrace: {}", + ex.toString()); } } private void forwardEventToDmis(final Map>> dmiNameCmHandleMap, - final SubscriptionEvent subscriptionEvent) { + final SubscriptionEvent subscriptionEvent, + final Headers eventHeaders) { dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); final String eventKey = createEventKey(subscriptionEvent, dmiName); final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; - eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, subscriptionEvent); + eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, eventHeaders, subscriptionEvent); }); } - private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set dmisToRespond) { - final String subscriptionEventId = subscriptionEvent.getEvent().getSubscription().getClientID() - + subscriptionEvent.getEvent().getSubscription().getName(); - - forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond); - final ResponseTimeoutTask responseTimeoutTask = - new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventId); - executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); - } - private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { return subscriptionEvent.getEvent().getSubscription().getClientID() + "-" @@ -121,5 +131,4 @@ public class SubscriptionEventForwarder { + "-" + dmiName; } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java new file mode 100644 index 0000000000..ade3f22f4b --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java @@ -0,0 +1,135 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.onap.cps.ncmp.api.impl.event.avc.SubscriptionOutcomeMapper; +import org.onap.cps.ncmp.api.impl.events.EventsPublisher; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; +import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; +import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; +import org.onap.cps.spi.model.DataNode; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventResponseOutcome { + + private final SubscriptionPersistence subscriptionPersistence; + + private final EventsPublisher outcomeEventsPublisher; + + private final SubscriptionOutcomeMapper subscriptionOutcomeMapper; + + @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}") + private String subscriptionOutcomeEventTopic; + + /** + * This is for construction of outcome message to be published for client apps. + * + * @param subscriptionClientId client id of the subscription. + * @param subscriptionName name of the subscription. + * @param isFullOutcomeResponse the flag to decide on complete or partial response to be generated. + */ + public void sendResponse(final String subscriptionClientId, final String subscriptionName, + final boolean isFullOutcomeResponse) { + final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse( + subscriptionClientId, subscriptionName, isFullOutcomeResponse); + final Headers headers = new RecordHeaders(); + final String subscriptionEventId = subscriptionClientId + subscriptionName; + outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic, + subscriptionEventId, headers, subscriptionEventOutcome); + } + + private SubscriptionEventOutcome generateResponse(final String subscriptionClientId, final String subscriptionName, + final boolean isFullOutcomeResponse) { + final Collection dataNodes = subscriptionPersistence.getDataNodesForSubscriptionEvent(); + final List> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes); + final List> cmHandleIdToStatus = + DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves); + return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName, + isFullOutcomeResponse); + } + + + private SubscriptionEventOutcome formSubscriptionOutcomeMessage( + final List> cmHandleIdToStatus, final String subscriptionClientId, + final String subscriptionName, final boolean isFullOutcomeResponse) { + + final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse( + cmHandleIdToStatus, subscriptionClientId, subscriptionName); + + final SubscriptionEventOutcome subscriptionEventOutcome = + subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse); + + if (isFullOutcomeResponse) { + subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME); + } else { + subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME); + } + + return subscriptionEventOutcome; + } + + private SubscriptionEventResponse toSubscriptionEventResponse( + final List> cmHandleIdToStatus, final String subscriptionClientId, + final String subscriptionName) { + final Map cmHandleIdToStatusMap = new HashMap<>(); + final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse(); + subscriptionEventResponse.setClientId(subscriptionClientId); + subscriptionEventResponse.setSubscriptionName(subscriptionName); + + for (final Collection cmHandleToStatusBucket: cmHandleIdToStatus) { + final Iterator bucketIterator = cmHandleToStatusBucket.iterator(); + while (bucketIterator.hasNext()) { + final String item = (String) bucketIterator.next(); + if ("PENDING".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.PENDING); + } + if ("REJECTED".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.REJECTED); + } + if ("ACCEPTED".equals(item)) { + cmHandleIdToStatusMap.put((String) bucketIterator.next(), + SubscriptionStatus.ACCEPTED); + } + } + } + subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap); + + return subscriptionEventResponse; + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java index 16d9b80f8b..f240c4510d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistence.java @@ -20,7 +20,9 @@ package org.onap.cps.ncmp.api.impl.subscriptions; +import java.util.Collection; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; +import org.onap.cps.spi.model.DataNode; public interface SubscriptionPersistence { @@ -31,4 +33,10 @@ public interface SubscriptionPersistence { */ void saveSubscriptionEvent(YangModelSubscriptionEvent yangModelSubscriptionEvent); + /** + * Get data nodes. + * + * @return the DataNode as collection. + */ + Collection getDataNodesForSubscriptionEvent(); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java index e8de083fde..9a063d6dfd 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java @@ -51,27 +51,32 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { createSubscriptionEventJsonData(jsonObjectMapper.asJsonString(yangModelSubscriptionEvent)); final Collection dataNodes = cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); - final Optional optional = dataNodes.stream().findFirst(); - if (optional.isPresent() && optional.get().getChildDataNodes().isEmpty()) { - saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, false); - } else { - saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, true); - } + final Optional dataNodeFirst = dataNodes.stream().findFirst(); + final boolean isCreateOperation = + dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty(); + saveOrUpdateSubscriptionEventYangModel(subscriptionEventJsonData, isCreateOperation); } private void saveOrUpdateSubscriptionEventYangModel(final String subscriptionEventJsonData, - final boolean isDataNodeExist) { - if (isDataNodeExist) { - log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData); - cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, - SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); - } else { + final boolean isCreateOperation) { + if (isCreateOperation) { log.info("SubscriptionEventJsonData to be saved into DB {}", subscriptionEventJsonData); cpsDataService.saveListElements(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); + } else { + log.info("SubscriptionEventJsonData to be updated into DB {}", subscriptionEventJsonData); + cpsDataService.updateDataNodeAndDescendants(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + SUBSCRIPTION_REGISTRY_PARENT, subscriptionEventJsonData, NO_TIMESTAMP); } } + @Override + public Collection getDataNodesForSubscriptionEvent() { + return cpsDataService.getDataNodes(SUBSCRIPTION_DATASPACE_NAME, + SUBSCRIPTION_ANCHOR_NAME, SUBSCRIPTION_REGISTRY_PARENT, + FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); + } + private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) { return "{\"subscription\":[" + yangModelSubscriptionAsJson + "]}"; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java new file mode 100644 index 0000000000..2fec59b736 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.onap.cps.spi.model.DataNode; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DataNodeHelper { + + /** + * The nested DataNode object is being flattened. + * + * @param dataNode object. + * @return DataNode as stream. + */ + public static Stream flatten(final DataNode dataNode) { + return Stream.concat(Stream.of(dataNode), + dataNode.getChildDataNodes().stream().flatMap(DataNodeHelper::flatten)); + } + + /** + * The leaves for each DataNode is listed as map. + * + * @param dataNodes as collection. + * @return list of map for the all leaves. + */ + public static List> getDataNodeLeaves(final Collection dataNodes) { + return dataNodes.stream() + .flatMap(DataNodeHelper::flatten) + .map(node -> node.getLeaves()) + .collect(Collectors.toList()); + } + + /** + * The cm handle and status is listed as a collection. + * + * @param dataNodeLeaves as a list of map. + * @return list of collection containing cm handle id and statuses. + */ + public static List> getCmHandleIdToStatus( + final List> dataNodeLeaves) { + return dataNodeLeaves.stream() + .map(target -> target.values()) + .filter(col -> col.contains("PENDING") + | col.contains("ACCEPTED") + | col.contains("REJECTED")) + .collect(Collectors.toList()); + } +} -- cgit 1.2.3-korg