diff options
author | halil.cakal <halil.cakal@est.tech> | 2023-07-13 11:28:18 +0100 |
---|---|---|
committer | halil.cakal <halil.cakal@est.tech> | 2023-07-26 17:03:21 +0100 |
commit | d789956fbf88f856472f975487c1975df91dbe3e (patch) | |
tree | 5969f9432199e0689d3bd150645a774ecefa3ac0 /cps-ncmp-service/src/main/java | |
parent | dcf84ad73f0301ef41049e692b9963f6dcac3661 (diff) |
Subscription Creation: NCMP to Client CloudEvent transformation
- Delete legacy avc subscription event and event outcome schemas
- Change subscription response and outcome sample json file contents
- Change ncmp event response code to support avc subscriptions
- Add mapper that maps cloud event to subscription event response
- Add mapper that maps subscription event outcome to cloud event
- Change subscription event response consumer to consume CloudEvents
- Change time out task to support response event instead client id and
name
- Change subscription event response mapper to support Cloud Event
- Change subscription outcome mapper to group subscription responses as
per details and status
- Change subscription status to have fromString functionality
- Change all unit tests to support new functionalities
- Add cps exceptions for cloud event and outcome type
- Add details field in yang model
- Change data node helper to supoort details field
- Consolidate final subscription response codes
- Fix code smells reported by SonarLint
Issue-ID: CPS-1739
Change-Id: I5eadeb8ef40d3d7befb762b5a8d2139fe3c85d7e
Signed-off-by: halil.cakal <halil.cakal@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main/java')
16 files changed, 461 insertions, 255 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java index d250c36a80..3b11249838 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java @@ -26,10 +26,14 @@ import lombok.Getter; public enum NcmpEventResponseCode { SUCCESS("0", "Successfully applied changes"), + SUCCESSFULLY_APPLIED_SUBSCRIPTION("1", "successfully applied subscription"), CM_HANDLES_NOT_FOUND("100", "cm handle id(s) not found"), CM_HANDLES_NOT_READY("101", "cm handle(s) not ready"), DMI_SERVICE_NOT_RESPONDING("102", "dmi plugin service is not responding"), - UNABLE_TO_READ_RESOURCE_DATA("103", "dmi plugin service is not able to read resource data"); + UNABLE_TO_READ_RESOURCE_DATA("103", "dmi plugin service is not able to read resource data"), + PARTIALLY_APPLIED_SUBSCRIPTION("104", "partially applied subscription"), + SUBSCRIPTION_NOT_APPLICABLE("105", "subscription not applicable for all cm handles"), + SUBSCRIPTION_PENDING("106", "subscription pending for all cm handles"); private final String statusCode; private final String statusMessage; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java index c178700eed..176e644bae 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ResponseTimeoutTask.java @@ -24,6 +24,7 @@ import com.hazelcast.map.IMap; import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; @Slf4j @RequiredArgsConstructor @@ -31,8 +32,7 @@ public class ResponseTimeoutTask implements Runnable { private final IMap<String, Set<String>> forwardedSubscriptionEventCache; private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; - private final String subscriptionClientId; - private final String subscriptionName; + private final SubscriptionEventResponse subscriptionEventResponse; @Override public void run() { @@ -47,9 +47,12 @@ public class ResponseTimeoutTask implements Runnable { } private void generateAndSendResponse() { + final String subscriptionClientId = subscriptionEventResponse.getData().getClientId(); + final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); final String subscriptionEventId = subscriptionClientId + subscriptionName; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { - subscriptionEventResponseOutcome.sendResponse(subscriptionClientId, subscriptionName); + subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, + "subscriptionCreatedStatus"); forwardedSubscriptionEventCache.remove(subscriptionEventId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java index f511965c77..5afc52d7e7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java @@ -58,22 +58,23 @@ public class SubscriptionEventConsumer { containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) { final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); + final String eventType = subscriptionEventConsumerRecord.value().getType(); final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent); final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore(); - if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { + if (!eventDatastore.equals("passthrough-running")) { throw new OperationNotYetSupportedException( - "passthrough datastores are currently only supported for event subscriptions"); + "passthrough-running datastores are currently only supported for event subscriptions"); } if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) { if (subscriptionModelLoaderEnabled) { persistSubscriptionEvent(subscriptionEvent); } - if ("CREATE".equals(cloudEvent.getType())) { + if ("subscriptionCreated".equals(cloudEvent.getType())) { log.info("Subscription for ClientID {} with name {} ...", subscriptionEvent.getData().getSubscription().getClientID(), subscriptionEvent.getData().getSubscription().getName()); if (notificationFeatureEnabled) { - subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType); } } } else { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 1fe963a279..f196cb01e9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -44,6 +44,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; import org.springframework.beans.factory.annotation.Value; @@ -74,7 +76,7 @@ public class SubscriptionEventForwarder { * * @param subscriptionEvent the event to be forwarded */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, final String eventType) { final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets(); if (cmHandleTargets == null || cmHandleTargets.isEmpty() || cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) { @@ -85,13 +87,19 @@ public class SubscriptionEventForwarder { inventoryPersistence.getYangModelCmHandles(cmHandleTargets); final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); - findDmisAndRespond(subscriptionEvent, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName); + findDmisAndRespond(subscriptionEvent, eventType, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName); } - private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, + private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final String eventType, final List<String> cmHandleTargetsAsStrings, final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName) { + final SubscriptionEventResponse emptySubscriptionEventResponse = + new SubscriptionEventResponse().withData(new Data()); + emptySubscriptionEventResponse.getData().setSubscriptionName( + subscriptionEvent.getData().getSubscription().getName()); + emptySubscriptionEventResponse.getData().setClientId( + subscriptionEvent.getData().getSubscription().getClientID()); final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream() .map(Map.Entry::getValue).map(Map::keySet).flatMap(Set::stream).collect(Collectors.toList()); @@ -104,27 +112,27 @@ public class SubscriptionEventForwarder { updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb); } if (dmisToRespond.isEmpty()) { - final String clientID = subscriptionEvent.getData().getSubscription().getClientID(); - final String subscriptionName = subscriptionEvent.getData().getSubscription().getName(); - subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName); + subscriptionEventResponseOutcome.sendResponse(emptySubscriptionEventResponse, + "subscriptionCreatedStatus"); } else { - startResponseTimeout(subscriptionEvent, dmisToRespond); + startResponseTimeout(emptySubscriptionEventResponse, dmisToRespond); final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent = clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent, eventType); } } - private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) { - final String subscriptionClientId = subscriptionEvent.getData().getSubscription().getClientID(); - final String subscriptionName = subscriptionEvent.getData().getSubscription().getName(); + private void startResponseTimeout(final SubscriptionEventResponse emptySubscriptionEventResponse, + final Set<String> dmisToRespond) { + final String subscriptionClientId = emptySubscriptionEventResponse.getData().getClientId(); + final String subscriptionName = emptySubscriptionEventResponse.getData().getSubscriptionName(); final String subscriptionEventId = subscriptionClientId + subscriptionName; forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); final ResponseTimeoutTask responseTimeoutTask = new ResponseTimeoutTask(forwardedSubscriptionEventCache, subscriptionEventResponseOutcome, - subscriptionClientId, subscriptionName); + emptySubscriptionEventResponse); try { executorService.schedule(responseTimeoutTask, dmiResponseTimeoutInMs, TimeUnit.MILLISECONDS); } catch (final RuntimeException ex) { @@ -135,7 +143,7 @@ public class SubscriptionEventForwarder { private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap, final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent - ncmpSubscriptionEvent) { + ncmpSubscriptionEvent, final String eventType) { dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map( cmHandleAndProperties -> { @@ -150,7 +158,7 @@ public class SubscriptionEventForwarder { final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; final CloudEvent ncmpSubscriptionCloudEvent = - SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey); + SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey, eventType); eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent); }); } @@ -182,6 +190,7 @@ public class SubscriptionEventForwarder { return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles().stream() .filter(targetCmHandle -> targetCmHandlesDoesNotExistInDb.contains(targetCmHandle.getCmHandleId())) .map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.getCmHandleId(), - SubscriptionStatus.REJECTED)).collect(Collectors.toList()); + SubscriptionStatus.REJECTED, "Targets not found")) + .collect(Collectors.toList()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java index bf9ceb1c3d..35d94cc7a2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; @@ -47,8 +46,7 @@ public interface SubscriptionEventMapper { */ @Named("mapTargetsToCmHandleTargets") default List<YangModelSubscriptionEvent.TargetCmHandle> mapTargetsToCmHandleTargets(List<String> targets) { - return targets.stream().map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target, - SubscriptionStatus.PENDING)) + return targets.stream().map(YangModelSubscriptionEvent.TargetCmHandle::new) .collect(Collectors.toList()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java index 20df706c07..ddb9fd6fcf 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseConsumer.java @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import com.hazelcast.map.IMap; +import io.cloudevents.CloudEvent; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -32,8 +33,9 @@ import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEven import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventResponseCloudMapper; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; import org.onap.cps.spi.model.DataNode; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; @@ -61,19 +63,21 @@ public class SubscriptionEventResponseConsumer { * @param subscriptionEventResponseConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.api.models.SubscriptionEventResponse"}) + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void consumeSubscriptionEventResponse( - final ConsumerRecord<String, SubscriptionEventResponse> subscriptionEventResponseConsumerRecord) { - final SubscriptionEventResponse subscriptionEventResponse = subscriptionEventResponseConsumerRecord.value(); - final String clientId = subscriptionEventResponse.getClientId(); + final ConsumerRecord<String, CloudEvent> subscriptionEventResponseConsumerRecord) { + final CloudEvent cloudEvent = subscriptionEventResponseConsumerRecord.value(); + final String eventType = subscriptionEventResponseConsumerRecord.value().getType(); + final SubscriptionEventResponse subscriptionEventResponse = + SubscriptionEventResponseCloudMapper.toSubscriptionEventResponse(cloudEvent); + final String clientId = subscriptionEventResponse.getData().getClientId(); log.info("subscription event response of clientId: {} is received.", clientId); - final String subscriptionName = subscriptionEventResponse.getSubscriptionName(); + final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); final String subscriptionEventId = clientId + subscriptionName; boolean createOutcomeResponse = false; if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) { final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId); - - dmiNames.remove(subscriptionEventResponse.getDmiName()); + dmiNames.remove(subscriptionEventResponse.getData().getDmiName()); forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames, ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS); createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty(); @@ -84,7 +88,7 @@ public class SubscriptionEventResponseConsumer { if (createOutcomeResponse && notificationFeatureEnabled && hasNoPendingCmHandles(clientId, subscriptionName)) { - subscriptionEventResponseOutcome.sendResponse(clientId, subscriptionName); + subscriptionEventResponseOutcome.sendResponse(subscriptionEventResponse, eventType); forwardedSubscriptionEventCache.remove(subscriptionEventId); } } @@ -92,10 +96,15 @@ public class SubscriptionEventResponseConsumer { private boolean hasNoPendingCmHandles(final String clientId, final String subscriptionName) { final Collection<DataNode> dataNodeSubscription = subscriptionPersistence.getCmHandlesForSubscriptionEvent( clientId, subscriptionName); - final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = - DataNodeHelper.getCmHandleIdToStatusMapFromDataNodes( - dataNodeSubscription); - return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING); + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal = + DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodeSubscription); + for (final Map<String, String> statusAndDetailsMap : cmHandleIdToStatusAndDetailsAsMapOriginal.values()) { + final String status = statusAndDetailsMap.get("status"); + if (SubscriptionStatus.PENDING.toString().equals(status)) { + return false; + } + } + return true; } private void updateSubscriptionEvent(final SubscriptionEventResponse subscriptionEventResponse) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java index 44181c57c9..dc122ee5d1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapper.java @@ -21,36 +21,35 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; @Mapper(componentModel = "spring") public interface SubscriptionEventResponseMapper { - @Mapping(source = "clientId", target = "clientId") - @Mapping(source = "subscriptionName", target = "subscriptionName") - @Mapping(source = "cmHandleIdToStatus", target = "predicates.targetCmHandles", - qualifiedByName = "mapStatusToCmHandleTargets") + @Mapping(source = "data.clientId", target = "clientId") + @Mapping(source = "data.subscriptionName", target = "subscriptionName") + @Mapping(source = "data.subscriptionStatus", target = "predicates.targetCmHandles", + qualifiedByName = "mapSubscriptionStatusToCmHandleTargets") YangModelSubscriptionEvent toYangModelSubscriptionEvent( SubscriptionEventResponse subscriptionEventResponse); /** - * Maps StatusToCMHandle to list of TargetCmHandle. + * Maps SubscriptionStatus to list of TargetCmHandle. * - * @param targets as a map + * @param subscriptionStatus as a list * @return TargetCmHandle list */ - @Named("mapStatusToCmHandleTargets") - default List<YangModelSubscriptionEvent.TargetCmHandle> mapStatusToCmHandleTargets( - Map<String, SubscriptionStatus> targets) { - return targets.entrySet().stream().map(target -> - new YangModelSubscriptionEvent.TargetCmHandle(target.getKey(), target.getValue())).collect( - Collectors.toList()); + @Named("mapSubscriptionStatusToCmHandleTargets") + default List<YangModelSubscriptionEvent.TargetCmHandle> mapSubscriptionStatusToCmHandleTargets( + List<SubscriptionStatus> subscriptionStatus) { + return subscriptionStatus.stream().map(status -> new YangModelSubscriptionEvent.TargetCmHandle(status.getId(), + org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus.fromString(status.getStatus().value()), + status.getDetails())).collect(Collectors.toList()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java index 8fdff17944..9ed686529d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseOutcome.java @@ -20,21 +20,20 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; -import java.io.Serializable; -import java.util.Collection; +import io.cloudevents.CloudEvent; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; +import org.onap.cps.ncmp.api.NcmpEventResponseCode; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.utils.DataNodeHelper; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; -import org.onap.cps.spi.model.DataNode; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionOutcomeCloudMapper; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -45,75 +44,106 @@ public class SubscriptionEventResponseOutcome { private final SubscriptionPersistence subscriptionPersistence; - private final EventsPublisher<SubscriptionEventOutcome> outcomeEventsPublisher; + private final EventsPublisher<CloudEvent> outcomeEventsPublisher; private final SubscriptionOutcomeMapper subscriptionOutcomeMapper; - @Value("${app.ncmp.avc.subscription-outcome-topic:cm-avc-subscription-response}") + @Value("${app.ncmp.avc.subscription-outcome-topic:subscription-response}") private String subscriptionOutcomeEventTopic; /** * This is for construction of outcome message to be published for client apps. * - * @param subscriptionClientId client id of the subscription. - * @param subscriptionName name of the subscription. + * @param subscriptionEventResponse event produced by Dmi Plugin */ - public void sendResponse(final String subscriptionClientId, final String subscriptionName) { - final SubscriptionEventOutcome subscriptionEventOutcome = generateResponse( - subscriptionClientId, subscriptionName); - final Headers headers = new RecordHeaders(); + public void sendResponse(final SubscriptionEventResponse subscriptionEventResponse, final String eventKey) { + final SubscriptionEventOutcome subscriptionEventOutcome = + formSubscriptionOutcomeMessage(subscriptionEventResponse); + final String subscriptionClientId = subscriptionEventResponse.getData().getClientId(); + final String subscriptionName = subscriptionEventResponse.getData().getSubscriptionName(); final String subscriptionEventId = subscriptionClientId + subscriptionName; - outcomeEventsPublisher.publishEvent(subscriptionOutcomeEventTopic, - subscriptionEventId, headers, subscriptionEventOutcome); + final CloudEvent subscriptionOutcomeCloudEvent = + SubscriptionOutcomeCloudMapper.toCloudEvent(subscriptionEventOutcome, + subscriptionEventId, eventKey); + outcomeEventsPublisher.publishCloudEvent(subscriptionOutcomeEventTopic, + subscriptionEventId, subscriptionOutcomeCloudEvent); } - private SubscriptionEventOutcome generateResponse(final String subscriptionClientId, - final String subscriptionName) { - final Collection<DataNode> dataNodes = - subscriptionPersistence.getCmHandlesForSubscriptionEvent(subscriptionClientId, subscriptionName); - final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes); - final List<Collection<Serializable>> cmHandleIdToStatus = - DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves); - final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = - DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus); - return formSubscriptionOutcomeMessage(cmHandleIdToStatus, subscriptionClientId, subscriptionName, - isFullOutcomeResponse(cmHandleIdToStatusMap)); + private SubscriptionEventOutcome formSubscriptionOutcomeMessage( + final SubscriptionEventResponse subscriptionEventResponse) { + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap = + DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode( + subscriptionPersistence.getCmHandlesForSubscriptionEvent( + subscriptionEventResponse.getData().getClientId(), + subscriptionEventResponse.getData().getSubscriptionName())); + final List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus> + subscriptionStatusList = mapCmHandleIdStatusDetailsMapToSubscriptionStatusList( + cmHandleIdToStatusAndDetailsAsMap); + subscriptionEventResponse.getData().setSubscriptionStatus(subscriptionStatusList); + return fromSubscriptionEventResponse(subscriptionEventResponse, + decideOnNcmpEventResponseCodeForSubscription(cmHandleIdToStatusAndDetailsAsMap)); } - private boolean isFullOutcomeResponse(final Map<String, SubscriptionStatus> cmHandleIdToStatusMap) { - return !cmHandleIdToStatusMap.values().contains(SubscriptionStatus.PENDING); + private static List<org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus> + mapCmHandleIdStatusDetailsMapToSubscriptionStatusList( + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) { + return cmHandleIdToStatusAndDetailsAsMap.entrySet() + .stream().map(entryset -> { + final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus + subscriptionStatus = new org.onap.cps.ncmp.events.avcsubscription1_0_0 + .dmi_to_ncmp.SubscriptionStatus(); + final String cmHandleId = entryset.getKey(); + final Map<String, String> statusAndDetailsMap = entryset.getValue(); + final String status = statusAndDetailsMap.get("status"); + final String details = statusAndDetailsMap.get("details"); + subscriptionStatus.setId(cmHandleId); + subscriptionStatus.setStatus( + org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp + .SubscriptionStatus.Status.fromValue(status)); + subscriptionStatus.setDetails(details); + return subscriptionStatus; + }).collect(Collectors.toList()); } - private SubscriptionEventOutcome formSubscriptionOutcomeMessage( - final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId, - final String subscriptionName, final boolean isFullOutcomeResponse) { + private NcmpEventResponseCode decideOnNcmpEventResponseCodeForSubscription( + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap) { - final SubscriptionEventResponse subscriptionEventResponse = toSubscriptionEventResponse( - cmHandleIdToStatus, subscriptionClientId, subscriptionName); + final boolean isAllTargetsPending = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.PENDING); - final SubscriptionEventOutcome subscriptionEventOutcome = - subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse); + final boolean isAllTargetsRejected = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.REJECTED); + + final boolean isAllTargetsAccepted = isAllTargetCmHandleStatusMatch(cmHandleIdToStatusAndDetailsAsMap, + SubscriptionStatus.ACCEPTED); - if (isFullOutcomeResponse) { - subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.COMPLETE_OUTCOME); + if (isAllTargetsAccepted) { + return NcmpEventResponseCode.SUCCESSFULLY_APPLIED_SUBSCRIPTION; + } else if (isAllTargetsRejected) { + return NcmpEventResponseCode.SUBSCRIPTION_NOT_APPLICABLE; + } else if (isAllTargetsPending) { + return NcmpEventResponseCode.SUBSCRIPTION_PENDING; } else { - subscriptionEventOutcome.setEventType(SubscriptionEventOutcome.EventType.PARTIAL_OUTCOME); + return NcmpEventResponseCode.PARTIALLY_APPLIED_SUBSCRIPTION; } + } - return subscriptionEventOutcome; + private boolean isAllTargetCmHandleStatusMatch( + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap, + final SubscriptionStatus subscriptionStatus) { + return cmHandleIdToStatusAndDetailsAsMap.values().stream() + .allMatch(entryset -> entryset.containsValue(subscriptionStatus.toString())); } - private SubscriptionEventResponse toSubscriptionEventResponse( - final List<Collection<Serializable>> cmHandleIdToStatus, final String subscriptionClientId, - final String subscriptionName) { - final Map<String, SubscriptionStatus> cmHandleIdToStatusMap = - DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus); + private SubscriptionEventOutcome fromSubscriptionEventResponse( + final SubscriptionEventResponse subscriptionEventResponse, + final NcmpEventResponseCode ncmpEventResponseCode) { - final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse(); - subscriptionEventResponse.setClientId(subscriptionClientId); - subscriptionEventResponse.setSubscriptionName(subscriptionName); - subscriptionEventResponse.setCmHandleIdToStatus(cmHandleIdToStatusMap); + final SubscriptionEventOutcome subscriptionEventOutcome = + subscriptionOutcomeMapper.toSubscriptionEventOutcome(subscriptionEventResponse); + subscriptionEventOutcome.getData().setStatusCode(Integer.parseInt(ncmpEventResponseCode.getStatusCode())); + subscriptionEventOutcome.getData().setStatusMessage(ncmpEventResponseCode.getStatusMessage()); - return subscriptionEventResponse; + return subscriptionEventOutcome; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java index cecde5f816..7803b982f3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionOutcomeMapper.java @@ -26,63 +26,80 @@ import java.util.stream.Collectors; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; -import org.onap.cps.ncmp.api.models.SubscriptionEventResponse; -import org.onap.cps.ncmp.events.avc.subscription.v1.SubscriptionEventOutcome; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfo; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.AdditionalInfoDetail; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; +import org.onap.cps.spi.exceptions.DataValidationException; @Mapper(componentModel = "spring") public interface SubscriptionOutcomeMapper { - @Mapping(source = "clientId", target = "event.subscription.clientID") - @Mapping(source = "subscriptionName", target = "event.subscription.name") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.rejectedTargets", - qualifiedByName = "mapStatusToCmHandleRejected") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.acceptedTargets", - qualifiedByName = "mapStatusToCmHandleAccepted") - @Mapping(source = "cmHandleIdToStatus", target = "event.predicates.pendingTargets", - qualifiedByName = "mapStatusToCmHandlePending") - SubscriptionEventOutcome toSubscriptionEventOutcome( - SubscriptionEventResponse subscriptionEventResponse); + @Mapping(source = "data.subscriptionStatus", target = "data.additionalInfo", + qualifiedByName = "mapListOfSubscriptionStatusToAdditionalInfo") + SubscriptionEventOutcome toSubscriptionEventOutcome(SubscriptionEventResponse subscriptionEventResponse); /** - * Maps StatusToCMHandle to list of TargetCmHandle rejected. + * Maps list of SubscriptionStatus to an AdditionalInfo. * - * @param targets as a map - * @return TargetCmHandle list + * @param subscriptionStatusList containing details + * @return an AdditionalInfo */ - @Named("mapStatusToCmHandleRejected") - default List<Object> mapStatusToCmHandleRejected(Map<String, SubscriptionStatus> targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.REJECTED.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); + @Named("mapListOfSubscriptionStatusToAdditionalInfo") + default AdditionalInfo mapListOfSubscriptionStatusToAdditionalInfo( + final List<SubscriptionStatus> subscriptionStatusList) { + if (subscriptionStatusList == null || subscriptionStatusList.isEmpty()) { + throw new DataValidationException("Invalid subscriptionStatusList", + "SubscriptionStatus list cannot be null or empty"); + } + + final Map<String, List<SubscriptionStatus>> rejectedSubscriptionsPerDetails = getSubscriptionsPerDetails( + subscriptionStatusList, SubscriptionStatus.Status.REJECTED); + final Map<String, List<String>> rejectedCmHandlesPerDetails = + getCmHandlesPerDetails(rejectedSubscriptionsPerDetails); + final List<AdditionalInfoDetail> rejectedCmHandles = getAdditionalInfoDetailList(rejectedCmHandlesPerDetails); + + + final Map<String, List<SubscriptionStatus>> pendingSubscriptionsPerDetails = getSubscriptionsPerDetails( + subscriptionStatusList, SubscriptionStatus.Status.PENDING); + final Map<String, List<String>> pendingCmHandlesPerDetails = + getCmHandlesPerDetails(pendingSubscriptionsPerDetails); + final List<AdditionalInfoDetail> pendingCmHandles = getAdditionalInfoDetailList(pendingCmHandlesPerDetails); + + final AdditionalInfo additionalInfo = new AdditionalInfo(); + additionalInfo.setRejected(rejectedCmHandles); + additionalInfo.setPending(pendingCmHandles); + + return additionalInfo; } - /** - * Maps StatusToCMHandle to list of TargetCmHandle accepted. - * - * @param targets as a map - * @return TargetCmHandle list - */ - @Named("mapStatusToCmHandleAccepted") - default List<Object> mapStatusToCmHandleAccepted(Map<String, SubscriptionStatus> targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.ACCEPTED.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); + private static Map<String, List<SubscriptionStatus>> getSubscriptionsPerDetails( + final List<SubscriptionStatus> subscriptionStatusList, final SubscriptionStatus.Status status) { + return subscriptionStatusList.stream() + .filter(subscriptionStatus -> subscriptionStatus.getStatus() == status) + .collect(Collectors.groupingBy(SubscriptionStatus::getDetails)); } - /** - * Maps StatusToCMHandle to list of TargetCmHandle pending. - * - * @param targets as a map - * @return TargetCmHandle list - */ - @Named("mapStatusToCmHandlePending") - default List<Object> mapStatusToCmHandlePending(Map<String, SubscriptionStatus> targets) { - return targets.entrySet() - .stream().filter(target -> SubscriptionStatus.PENDING.equals(target.getValue())) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); + private static Map<String, List<String>> getCmHandlesPerDetails( + final Map<String, List<SubscriptionStatus>> subscriptionsPerDetails) { + return subscriptionsPerDetails.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().stream() + .map(SubscriptionStatus::getId) + .collect(Collectors.toList()) + )); + } + + private static List<AdditionalInfoDetail> getAdditionalInfoDetailList( + final Map<String, List<String>> cmHandlesPerDetails) { + return cmHandlesPerDetails.entrySet().stream() + .map(entry -> { + final AdditionalInfoDetail detail = new AdditionalInfoDetail(); + detail.setDetails(entry.getKey()); + detail.setTargets(entry.getValue()); + return detail; + }).collect(Collectors.toList()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java index d2b1237a4d..83a375b1b8 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionPersistenceImpl.java @@ -22,7 +22,6 @@ package org.onap.cps.ncmp.api.impl.subscriptions; import static org.onap.cps.ncmp.api.impl.constants.DmiRegistryConstants.NO_TIMESTAMP; -import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -70,33 +69,46 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { private void findDeltaCmHandlesAddOrUpdateInDatabase(final YangModelSubscriptionEvent yangModelSubscriptionEvent, final String clientId, final String subscriptionName, final Collection<DataNode> dataNodes) { - final Map<String, SubscriptionStatus> cmHandleIdsFromYangModel = + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapNew = extractCmHandleFromYangModelAsMap(yangModelSubscriptionEvent); - final Map<String, SubscriptionStatus> cmHandleIdsFromDatabase = - extractCmHandleFromDbAsMap(dataNodes); + final Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapOriginal = + DataNodeHelper.cmHandleIdToStatusAndDetailsAsMapFromDataNode(dataNodes); - final Map<String, SubscriptionStatus> newCmHandles = - mapDifference(cmHandleIdsFromYangModel, cmHandleIdsFromDatabase); - traverseCmHandleList(newCmHandles, clientId, subscriptionName, true); + final Map<String, Map<String, String>> newTargetCmHandles = + mapDifference(cmHandleIdToStatusAndDetailsAsMapNew, + cmHandleIdToStatusAndDetailsAsMapOriginal); + traverseCmHandleList(newTargetCmHandles, clientId, subscriptionName, true); - final Map<String, SubscriptionStatus> existingCmHandles = - mapDifference(cmHandleIdsFromYangModel, newCmHandles); - traverseCmHandleList(existingCmHandles, clientId, subscriptionName, false); + final Map<String, Map<String, String>> existingTargetCmHandles = + mapDifference(cmHandleIdToStatusAndDetailsAsMapNew, newTargetCmHandles); + traverseCmHandleList(existingTargetCmHandles, clientId, subscriptionName, false); } - private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes, - final String clientId, final String subscriptionName) { - final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst(); - return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty()) - || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty()); - } - - private void traverseCmHandleList(final Map<String, SubscriptionStatus> cmHandleMap, + private static Map<String, Map<String, String>> extractCmHandleFromYangModelAsMap( + final YangModelSubscriptionEvent yangModelSubscriptionEvent) { + return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles() + .stream().collect( + HashMap<String, Map<String, String>>::new, + (result, cmHandle) -> { + final String cmHandleId = cmHandle.getCmHandleId(); + final SubscriptionStatus status = cmHandle.getStatus(); + final String details = cmHandle.getDetails(); + + if (cmHandleId != null && status != null) { + result.put(cmHandleId, new HashMap<>()); + result.get(cmHandleId).put("status", status.toString()); + result.get(cmHandleId).put("details", details == null ? "" : details); + } + }, + HashMap::putAll + ); + } + + private void traverseCmHandleList(final Map<String, Map<String, String>> cmHandleMap, final String clientId, final String subscriptionName, final boolean isAddListElementOperation) { - final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList = - targetCmHandlesAsList(cmHandleMap); + final List<YangModelSubscriptionEvent.TargetCmHandle> cmHandleList = targetCmHandlesAsList(cmHandleMap); for (final YangModelSubscriptionEvent.TargetCmHandle targetCmHandle : cmHandleList) { final String targetCmHandleAsJson = createTargetCmHandleJsonData(jsonObjectMapper.asJsonString(targetCmHandle)); @@ -105,6 +117,13 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { } } + private boolean isSubscriptionRegistryEmptyOrNonExist(final Collection<DataNode> dataNodes, + final String clientId, final String subscriptionName) { + final Optional<DataNode> dataNodeFirst = dataNodes.stream().findFirst(); + return ((dataNodeFirst.isPresent() && dataNodeFirst.get().getChildDataNodes().isEmpty()) + || getCmHandlesForSubscriptionEvent(clientId, subscriptionName).isEmpty()); + } + private void addOrReplaceCmHandlePredicateListElement(final String targetCmHandleAsJson, final String clientId, final String subscriptionName, @@ -142,25 +161,16 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); } - private static Map<String, SubscriptionStatus> extractCmHandleFromDbAsMap(final Collection<DataNode> dataNodes) { - final List<Map<String, Serializable>> dataNodeLeaves = DataNodeHelper.getDataNodeLeaves(dataNodes); - final List<Collection<Serializable>> cmHandleIdToStatus = DataNodeHelper.getCmHandleIdToStatus(dataNodeLeaves); - return DataNodeHelper.getCmHandleIdToStatusMap(cmHandleIdToStatus); - } - - private static Map<String, SubscriptionStatus> extractCmHandleFromYangModelAsMap( - final YangModelSubscriptionEvent yangModelSubscriptionEvent) { - return yangModelSubscriptionEvent.getPredicates().getTargetCmHandles() - .stream().collect(Collectors.toMap( - YangModelSubscriptionEvent.TargetCmHandle::getCmHandleId, - YangModelSubscriptionEvent.TargetCmHandle::getStatus)); - } - private static List<YangModelSubscriptionEvent.TargetCmHandle> targetCmHandlesAsList( - final Map<String, SubscriptionStatus> newCmHandles) { - return newCmHandles.entrySet().stream().map(entry -> - new YangModelSubscriptionEvent.TargetCmHandle(entry.getKey(), - entry.getValue())).collect(Collectors.toList()); + final Map<String, Map<String, String>> newCmHandles) { + return newCmHandles.entrySet().stream().map(entry -> { + final String cmHandleId = entry.getKey(); + final Map<String, String> statusAndDetailsMap = entry.getValue(); + final String status = statusAndDetailsMap.get("status"); + final String details = statusAndDetailsMap.get("details"); + return new YangModelSubscriptionEvent.TargetCmHandle(cmHandleId, + SubscriptionStatus.fromString(status), details); + }).collect(Collectors.toList()); } private static String createSubscriptionEventJsonData(final String yangModelSubscriptionAsJson) { @@ -181,9 +191,9 @@ public class SubscriptionPersistenceImpl implements SubscriptionPersistence { + "' and @subscriptionName='" + subscriptionName + "']"; } - private static <K, V> Map<K, V> mapDifference(final Map<? extends K, ? extends V> left, - final Map<? extends K, ? extends V> right) { - final Map<K, V> difference = new HashMap<>(); + private static <K, L, M> Map<K, Map<L, M>> mapDifference(final Map<K, Map<L, M>> left, + final Map<K, Map<L, M>> right) { + final Map<K, Map<L, M>> difference = new HashMap<>(); difference.putAll(left); difference.putAll(right); difference.entrySet().removeAll(right.entrySet()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java index ce3b88ba03..63ab102d14 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/subscriptions/SubscriptionStatus.java @@ -20,36 +20,30 @@ package org.onap.cps.ncmp.api.impl.subscriptions; -import java.io.Serializable; -import java.util.Iterator; -import java.util.Map; public enum SubscriptionStatus { - ACCEPTED, - REJECTED, - PENDING; + ACCEPTED("ACCEPTED"), + REJECTED("REJECTED"), + PENDING("PENDING"); + private final String subscriptionStatusValue; + + SubscriptionStatus(final String subscriptionStatusValue) { + this.subscriptionStatusValue = subscriptionStatusValue; + } /** - * Populates a map with a key of cm handle id and a value of subscription status. + * Finds the value of the given enum. * - * @param resultMap the map is being populated - * @param bucketIterator to iterate over the collection + * @param statusValue value of the enum + * @return a SubscriptionStatus */ - public static void populateCmHandleToSubscriptionStatusMap(final Map<String, SubscriptionStatus> resultMap, - final Iterator<Serializable> bucketIterator) { - final String item = (String) bucketIterator.next(); - if ("PENDING".equals(item)) { - resultMap.put((String) bucketIterator.next(), - SubscriptionStatus.PENDING); - } - if ("REJECTED".equals(item)) { - resultMap.put((String) bucketIterator.next(), - SubscriptionStatus.REJECTED); - } - if ("ACCEPTED".equals(item)) { - resultMap.put((String) bucketIterator.next(), - SubscriptionStatus.ACCEPTED); + public static SubscriptionStatus fromString(final String statusValue) { + for (final SubscriptionStatus subscriptionStatusType : SubscriptionStatus.values()) { + if (subscriptionStatusType.subscriptionStatusValue.equalsIgnoreCase(statusValue)) { + return subscriptionStatusType; + } } + return null; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java index f42a378fcb..c032d1e8a4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/DataNodeHelper.java @@ -23,14 +23,12 @@ package org.onap.cps.ncmp.api.impl.utils; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.AccessLevel; import lombok.NoArgsConstructor; -import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.spi.model.DataNode; @NoArgsConstructor(access = AccessLevel.PRIVATE) @@ -50,8 +48,8 @@ public class DataNodeHelper { /** * The leaves for each DataNode is listed as map. * - * @param dataNodes as collection. - * @return list of map for the all leaves. + * @param dataNodes as collection + * @return list of map for the all leaves */ public static List<Map<String, Serializable>> getDataNodeLeaves(final Collection<DataNode> dataNodes) { return dataNodes.stream() @@ -61,47 +59,42 @@ public class DataNodeHelper { } /** - * The cm handle and status is listed as a collection. + * Extracts the mapping of cm handle id to status with details from nodes leaves. * - * @param dataNodeLeaves as a list of map. - * @return list of collection containing cm handle id and statuses. + * @param dataNodeLeaves as a list of map + * @return cm handle id to status and details mapping */ - public static List<Collection<Serializable>> getCmHandleIdToStatus( + public static Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMap( final List<Map<String, Serializable>> dataNodeLeaves) { return dataNodeLeaves.stream() - .map(Map::values) - .filter(col -> col.contains("PENDING") - || col.contains("ACCEPTED") - || col.contains("REJECTED")) - .collect(Collectors.toList()); - } + .filter(entryset -> entryset.values().contains("PENDING") + || entryset.values().contains("ACCEPTED") + || entryset.values().contains("REJECTED")) + .collect( + HashMap<String, Map<String, String>>::new, + (result, entry) -> { + final String cmHandleId = (String) entry.get("cmHandleId"); + final String status = (String) entry.get("status"); + final String details = (String) entry.get("details"); - /** - * The cm handle and status is returned as a map. - * - * @param cmHandleIdToStatus as a list of collection - * @return a map of cm handle id to status - */ - public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMap( - final List<Collection<Serializable>> cmHandleIdToStatus) { - final Map<String, SubscriptionStatus> resultMap = new HashMap<>(); - for (final Collection<Serializable> cmHandleToStatusBucket: cmHandleIdToStatus) { - final Iterator<Serializable> bucketIterator = cmHandleToStatusBucket.iterator(); - while (bucketIterator.hasNext()) { - SubscriptionStatus.populateCmHandleToSubscriptionStatusMap(resultMap, bucketIterator); - } - } - return resultMap; + if (cmHandleId != null && status != null) { + result.put(cmHandleId, new HashMap<>()); + result.get(cmHandleId).put("status", status); + result.get(cmHandleId).put("details", details == null ? "" : details); + } + }, + HashMap::putAll + ); } /** - * Extracts the mapping of cm handle id to status from data node collection. + * Extracts the mapping of cm handle id to status with details from data node collection. * * @param dataNodes as a collection - * @return cm handle id to status mapping + * @return cm handle id to status and details mapping */ - public static Map<String, SubscriptionStatus> getCmHandleIdToStatusMapFromDataNodes( + public static Map<String, Map<String, String>> cmHandleIdToStatusAndDetailsAsMapFromDataNode( final Collection<DataNode> dataNodes) { - return getCmHandleIdToStatusMap(getCmHandleIdToStatus(getDataNodeLeaves(dataNodes))); + return cmHandleIdToStatusAndDetailsAsMap(getDataNodeLeaves(dataNodes)); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java index a7de479046..df3998fe80 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java @@ -27,10 +27,12 @@ import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.jackson.PojoCloudEventDataMapper; import java.net.URI; +import java.util.UUID; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; +import org.onap.cps.spi.exceptions.CloudEventConstructionException; @NoArgsConstructor(access = AccessLevel.PRIVATE) @Slf4j @@ -38,6 +40,8 @@ public class SubscriptionEventCloudMapper { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String randomId = UUID.randomUUID().toString(); + /** * Maps CloudEvent object to SubscriptionEvent. * @@ -62,18 +66,24 @@ public class SubscriptionEventCloudMapper { * * @param ncmpSubscriptionEvent object. * @param eventKey as String. - * @return CloudEvent builded. + * @return CloudEvent built. */ public static CloudEvent toCloudEvent( final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent, - final String eventKey) { + final String eventKey, final String eventType) { try { return CloudEventBuilder.v1() - .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)) - .withId(eventKey).withType("CREATE").withSource( - URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID())).build(); + .withId(randomId) + .withSource(URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID())) + .withType(eventType) + .withExtension("correlationid", eventKey) + .withDataSchema(URI.create("urn:cps:" + + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi + .SubscriptionEvent.class.getName() + ":1.0.0")) + .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)).build(); } catch (final Exception ex) { - throw new RuntimeException("The Cloud Event could not be constructed.", ex); + throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to " + + "serialize or required headers is missing", ex); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java new file mode 100644 index 0000000000..17aba65cf7 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventResponseCloudMapper.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.CloudEventUtils; +import io.cloudevents.core.data.PojoCloudEventData; +import io.cloudevents.jackson.PojoCloudEventDataMapper; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public class SubscriptionEventResponseCloudMapper { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Maps CloudEvent object to SubscriptionEventResponse. + * + * @param cloudEvent object + * @return SubscriptionEventResponse deserialized + */ + public static SubscriptionEventResponse toSubscriptionEventResponse(final CloudEvent cloudEvent) { + final PojoCloudEventData<SubscriptionEventResponse> deserializedCloudEvent = CloudEventUtils + .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEventResponse.class)); + if (deserializedCloudEvent == null) { + log.debug("No data found in the consumed subscription response event"); + return null; + } else { + final SubscriptionEventResponse subscriptionEventResponse = deserializedCloudEvent.getValue(); + log.debug("Consuming subscription response event {}", subscriptionEventResponse); + return subscriptionEventResponse; + } + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java new file mode 100644 index 0000000000..92c5656121 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionOutcomeCloudMapper.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_client.SubscriptionEventOutcome; +import org.onap.cps.spi.exceptions.CloudEventConstructionException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public class SubscriptionOutcomeCloudMapper { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static String randomId = UUID.randomUUID().toString(); + + /** + * Maps SubscriptionEventOutcome to a CloudEvent. + * + * @param subscriptionEventOutcome object + * @return CloudEvent + */ + public static CloudEvent toCloudEvent(final SubscriptionEventOutcome subscriptionEventOutcome, + final String eventKey, final String eventType) { + try { + return CloudEventBuilder.v1() + .withId(randomId) + .withSource(URI.create("NCMP")) + .withType(eventType) + .withExtension("correlationid", eventKey) + .withDataSchema(URI.create("urn:cps:" + SubscriptionEventOutcome.class.getName() + ":1.0.0")) + .withData(objectMapper.writeValueAsBytes(subscriptionEventOutcome)).build(); + } catch (final Exception ex) { + throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to " + + "serialize or required headers is missing", ex); + } + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java index 4dcc5797ca..866bfd4e7b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/yangmodels/YangModelSubscriptionEvent.java @@ -81,9 +81,18 @@ public class YangModelSubscriptionEvent { @JsonProperty() private final SubscriptionStatus status; + @JsonProperty() + private final String details; + + /** + * Constructor with single parameter for TargetCmHandle. + * + * @param cmHandleId as cm handle id + */ public TargetCmHandle(final String cmHandleId) { this.cmHandleId = cmHandleId; this.status = SubscriptionStatus.PENDING; + this.details = "Subscription forwarded to dmi plugin"; } } } |