diff options
Diffstat (limited to 'cps-ncmp-service')
10 files changed, 227 insertions, 75 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java index 544db50a55..248db9805c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,30 +31,32 @@ import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter; import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext; import org.onap.cps.utils.JsonObjectMapper; -@Builder(buildMethodName = "setCloudEvent") -public class NcmpCloudEventBuilder { +@Builder +public class NcmpEvent { - private Object event; + private Object data; private Map<String, String> extensions; private String type; @Builder.Default - private static final String EVENT_SPEC_VERSION_V1 = "1.0.0"; + private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0"; + @Builder.Default + private static final String CLOUD_EVENT_SOURCE = "NCMP"; /** * Creates ncmp cloud event with provided attributes. * * @return Cloud Event */ - public CloudEvent build() { + public CloudEvent asCloudEvent() { final JsonObjectMapper jsonObjectMapper = CpsApplicationContext.getCpsBean(JsonObjectMapper.class); final CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSource(URI.create("NCMP")) + .withSource(URI.create(CLOUD_EVENT_SOURCE)) .withType(type) - .withDataSchema(URI.create("urn:cps:" + type + ":" + EVENT_SPEC_VERSION_V1)) + .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1)) .withTime(EventDateTimeFormatter.toIsoOffsetDateTime( EventDateTimeFormatter.getCurrentIsoFormattedDateTime())) - .withData(jsonObjectMapper.asJsonBytes(event)); + .withData(jsonObjectMapper.asJsonBytes(data)); extensions.entrySet().stream() .filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue())) .forEach(extensionEntry -> diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java index 9bd1119588..7afe606f4f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; import lombok.RequiredArgsConstructor; import org.onap.cps.events.EventsPublisher; -import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder; +import org.onap.cps.ncmp.api.impl.events.NcmpEvent; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc; import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data; @@ -53,8 +53,8 @@ public class AvcEventPublisher { final Map<String, String> extensions = createAvcEventExtensions(eventKey); final CloudEvent avcCloudEvent = - NcmpCloudEventBuilder.builder().type(AvcEvent.class.getTypeName()) - .event(avcEvent).extensions(extensions).setCloudEvent().build(); + NcmpEvent.builder().type(AvcEvent.class.getTypeName()) + .data(avcEvent).extensions(extensions).build().asCloudEvent(); eventsPublisher.publishCloudEvent(avcTopic, eventKey, avcCloudEvent); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java index 8c1cac39dc..4b3a085147 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java @@ -119,8 +119,8 @@ public class DmiCmNotificationSubscriptionCacheHandler { for (final String cmHandle: cmHandles) { for (final String xpath: xpaths) { - cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType, - cmHandle, xpath, subscriptionId); + cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle, + xpath, subscriptionId); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java index 6b02adb654..3bb40c3b7e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java @@ -31,9 +31,9 @@ public interface CmNotificationSubscriptionPersistenceService { /** * Check if we have an ongoing cm subscription based on the parameters. * - * @param datastoreType valid datastore type - * @param cmHandleId cmhandle id - * @param xpath valid xpath + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath * @return true for ongoing cmsubscription , otherwise false */ boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, @@ -50,22 +50,35 @@ public interface CmNotificationSubscriptionPersistenceService { /** * Get all ongoing cm notification subscription based on the parameters. * - * @param datastoreType valid datastore type - * @param cmHandleId cmhandle id - * @param xpath valid xpath + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath * @return collection of subscription ids of ongoing cm notification subscription */ Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType, final String cmHandleId, final String xpath); /** - * Add or update cm notification subscription. + * Add cm notification subscription. * - * @param datastoreType valid datastore type - * @param cmHandle cmhandle id - * @param xpath valid xpath - * @param newSubscriptionId subscription Id to be added + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @param newSubscriptionId subscription id to be added */ - void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandle, - final String xpath, final String newSubscriptionId); + void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String newSubscriptionId); + + /** + * Remove cm notification Subscription. + * + * @param datastoreType the susbcription target datastore type + * @param cmHandleId the id of the cm handle for the susbcription + * @param xpath the target xpath + * @param subscriptionId subscription id to remove + */ + void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String subscriptionId); + } + diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java index 2efd321b8d..92f3459187 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java @@ -24,7 +24,6 @@ import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS; import java.io.Serializable; import java.time.OffsetDateTime; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -60,7 +59,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif @Override public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath) { + final String xpath) { return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty(); } @@ -73,7 +72,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif @Override public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType, - final String cmHandleId, final String xpath) { + final String cmHandleId, final String xpath) { final String isOngoingCmSubscriptionCpsPathQuery = CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, @@ -88,45 +87,77 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif } @Override - public void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, - final String xpath, final String newSubscriptionId) { - if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) { - final DataNode existingFilterNode = - cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, - CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, - escapeQuotesByDoublingThem(xpath)), - OMIT_DESCENDANTS).iterator().next(); - final Collection<String> existingSubscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, + public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String subscriptionId) { + if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath) + && (!getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath) + .contains(subscriptionId))) { + final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath); + final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath); - if (!existingSubscriptionIds.contains(newSubscriptionId)) { - updateListOfSubscribers(existingSubscriptionIds, newSubscriptionId, existingFilterNode); - } + subscriptionIds.add(subscriptionId); + saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds); + } else { + addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, subscriptionId); + } + } + + @Override + public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId, + final String xpath, final String subscriptionId) { + final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath); + final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType, + cmHandleId, xpath); + subscriptionIds.remove(subscriptionId); + saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds); + if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) { + log.info("There are subscribers left for the following cps path {} :", + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + escapeQuotesByDoublingThem(xpath))); } else { - addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, newSubscriptionId); + log.info("No subscribers left for the following cps path {} :", + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + escapeQuotesByDoublingThem(xpath))); + deleteListOfSubscriptionsFor(datastoreType, cmHandleId, xpath); } } + private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId, + final String xpath) { + cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + escapeQuotesByDoublingThem(xpath)), + OffsetDateTime.now()); + } + + private DataNode getSubscriptionAsDataNode(final DatastoreType datastoreType, final String cmHandleId, + final String xpath) { + return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId, + escapeQuotesByDoublingThem(xpath)), + OMIT_DESCENDANTS).iterator().next(); + } + private void addNewSubscriptionViaDatastore(final DatastoreType datastoreType, final String cmHandleId, final String xpath, final String newSubscriptionId) { final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles" .formatted(datastoreType.getDatastoreName()); - final String updatedJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":" + final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":" + "[{\"xpath\":\"%s\",\"subscriptionIds\":[\"%s\"]}]}}]}", cmHandleId, xpath, newSubscriptionId); - cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson, + cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson, OffsetDateTime.now(), ContentType.JSON); } - private void updateListOfSubscribers(final Collection<String> existingSubscriptionIds, - final String newSubscriptionId, final DataNode existingFilterNode) { - final String parentXpath = CpsPathUtil.getNormalizedParentXpath(existingFilterNode.getXpath()); - final List<String> updatedSubscribers = new ArrayList<>(existingSubscriptionIds); - updatedSubscribers.add(newSubscriptionId); - final Map<String, Serializable> updatedLeaves = new HashMap<>(); - updatedLeaves.put("xpath", existingFilterNode.getLeaves().get("xpath")); - updatedLeaves.put("subscriptionIds", (Serializable) updatedSubscribers); - final String updatedJson = "{\"filter\":[" + jsonObjectMapper.asJsonString(updatedLeaves) + "]}"; - cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson, - OffsetDateTime.now()); + private void saveSubscriptionDetails(final DataNode subscriptionDetailsAsDataNode, + final Collection<String> subscriptionIds) { + final Map<String, Serializable> subscriptionDetailsAsMap = new HashMap<>(); + subscriptionDetailsAsMap.put("xpath", subscriptionDetailsAsDataNode.getLeaves().get("xpath")); + subscriptionDetailsAsMap.put("subscriptionIds", (Serializable) subscriptionIds); + final String parentXpath = CpsPathUtil.getNormalizedParentXpath(subscriptionDetailsAsDataNode.getXpath()); + final String subscriptionDetailsAsJson = "{\"filter\":[" + + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap).replace("'", "\"") + "]}"; + cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, + subscriptionDetailsAsJson, OffsetDateTime.now()); } private static String escapeQuotesByDoublingThem(final String inputXpath) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java index 61da706c59..42bad89f52 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.NcmpResponseStatus; -import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder; +import org.onap.cps.ncmp.api.impl.events.NcmpEvent; import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation; import org.onap.cps.ncmp.events.async1_0_0.Data; import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; @@ -57,8 +57,8 @@ public class DataOperationEventCreator { final Data data = createPayloadFromDataOperationResponses(cmHandleIdsPerResponseCodesPerOperation); dataOperationEvent.setData(data); final Map<String, String> extensions = createDataOperationExtensions(requestId, clientTopic); - return NcmpCloudEventBuilder.builder().type(DataOperationEvent.class.getName()) - .event(dataOperationEvent).extensions(extensions).setCloudEvent().build(); + return NcmpEvent.builder().type(DataOperationEvent.class.getName()) + .data(dataOperationEvent).extensions(extensions).build().asCloudEvent(); } private static Data createPayloadFromDataOperationResponses(final MultiValueMap<DmiDataOperation, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index a8b4e286b6..4b016b37d1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils { DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), CM_HANDLES_NOT_READY, nonReadyCmHandleIds); } - if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); - } + publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); return dmiDataOperationsOutPerDmiServiceName; } /** + * Handles the async task completion for an entire data, publishing errors to client topic on task failure. + * + * @param topicParamInQuery client given topic + * @param requestId unique identifier per request + * @param dataOperationRequest incoming data operation request details + * @param throwable error cause, or null if task completed with no exception + */ + public static void handleAsyncTaskCompletionForDataOperationsRequest( + final String topicParamInQuery, + final String requestId, + final DataOperationRequest dataOperationRequest, + final Throwable throwable) { + if (throwable == null) { + log.info("Data operations request {} completed.", requestId); + } else if (throwable instanceof TimeoutException) { + log.error("Data operations request {} timed out.", requestId); + ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, + requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING); + } else { + log.error("Data operations request {} failed.", requestId, throwable); + ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, + requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR); + } + } + + /** + * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic. + * + * @param topicParamInQuery client given topic + * @param requestId unique identifier per request + * @param dataOperationRequestIn incoming data operation request details + * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations + */ + private static void publishErrorMessageToClientTopicForEntireOperation( + final String topicParamInQuery, + final String requestId, + final DataOperationRequest dataOperationRequestIn, + final NcmpResponseStatus ncmpResponseStatus) { + + final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> + cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>(); + + for (final DataOperationDefinition dataOperationDefinitionIn : + dataOperationRequestIn.getDataOperationDefinitions()) { + cmHandleIdsPerResponseCodesPerOperation.add( + DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), + Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds())); + } + publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); + } + + /** * Creates data operation cloud event and publish it to client topic. * * @param clientTopic client given topic * @param requestId unique identifier per request - * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code + * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code */ public static void publishErrorMessageToClientTopic(final String clientTopic, final String requestId, final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> cmHandleIdsPerResponseCodesPerOperation) { - final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, - requestId, cmHandleIdsPerResponseCodesPerOperation); - final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); - eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { + final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, + requestId, cmHandleIdsPerResponseCodesPerOperation); + final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); + eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + } } private static Map<String, String> getDmiServiceNamesPerCmHandleId( diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy index 47a1c89468..10e060fee6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy @@ -139,7 +139,7 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec { when: 'subscription is persisted in database' objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1') then: 'persistence service is called the correct number of times per dmi' - 4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId) + 4 * mockCmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(_,_,_,subscriptionId) } def setUpTestEvent(){ diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy index 19ebc3d711..13a20a1eb2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy @@ -71,12 +71,12 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification def 'Add new subscriber to an ongoing cm notification subscription'() { given: 'a valid cm subscription path query' - def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y'); + def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y') and: 'a dataNode exists for the given cps path query' mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])] when: 'the method to add/update cm notification subscription is called' - objectUnderTest.addOrUpdateCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId') + objectUnderTest.addCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId') then: 'data service method to update list of subscribers is called once' 1 * mockCpsDataService.updateNodeLeaves( 'NCMP-Admin', @@ -95,7 +95,7 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification cpsPathQuery.formatted(datastoreName), FetchDescendantsOption.OMIT_DESCENDANTS) >> [] when: 'the method to add/update cm notification subscription is called' - objectUnderTest.addOrUpdateCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId') + objectUnderTest.addCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId') then: 'data service method to update list of subscribers is called once with the correct parameters' 1 * mockCpsDataService.saveData( 'NCMP-Admin', @@ -107,4 +107,30 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification 'passthrough_running' | DatastoreType.PASSTHROUGH_RUNNING || "ncmp-datastore:passthrough-running" 'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL || "ncmp-datastore:passthrough-operational" } + + def 'Remove subscriber from a list of an ongoing cm notification subscription'() { + given: 'a subscription exists when queried' + def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y') + mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', + cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1', 'sub-2']])] + when: 'the subscriber is removed' + objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1') + then: 'the list of subscribers is updated' + 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions', + '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters', + '{"filter":[{"xpath":"/x/y","subscriptionIds":["sub-2"]}]}', _) + } + + def 'Removing ongoing subscription with no subscribers'(){ + given: 'a subscription exists when queried but has no subscribers' + def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y') + mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions', + cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': []])] + when: 'a an ongoing subscription is refreshed' + objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1') + then: 'the subscription with empty subscriber list is removed' + 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions', + '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']', + _) + } }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy index 5690b8f214..8df27bb62c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy @@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsPublisher +import org.onap.cps.ncmp.api.NcmpResponseStatus import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.impl.inventory.CmHandleState @@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration + import java.time.Duration +import java.util.concurrent.TimeoutException import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper]) class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer)) def static clientTopic = 'my-topic-name' def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' @@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() { given: 'consumer subscribing to client topic' + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer)) cloudEventKafkaConsumer.subscribe([clientTopic]) and: 'data operation request having non-ready and non-existing cm handle ids' def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') @@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { when: 'data operation request is processed' ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles) and: 'subscribed client specified topic is polled and first record is selected' - def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() then: 'verify cloud compliant headers' def consumerRecordOutHeaders = consumerRecordOut.headers() assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null @@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { and: 'data operation response event response size is 3' dataOperationResponseEvent.data.responses.size() == 3 and: 'verify published data operation response as json string' - def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json') + def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json') jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } + def 'Publish error response for entire data operations request when async task fails'() { + given: 'consumer subscribing to client topic' + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer)) + cloudEventKafkaConsumer.subscribe([clientTopic]) + and: 'data operation request having non-ready and non-existing cm handle ids' + def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') + def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class) + when: 'an error occurs for the entire data operations request' + ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown) + and: 'subscribed client specified topic is polled and first record is selected' + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() + def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) + then: 'data operation response event response size is 3' + dataOperationResponseEvent.data.responses.size() == 3 + and: 'all 3 have the expected error code' + dataOperationResponseEvent.data.responses.each { + assert it.statusCode == errorReportedToClientTopic.code + } + where: + scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic + 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING + 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR + } + static def getYangModelCmHandles() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() |