summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-application/src/main/resources/application.yml4
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java16
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java25
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy2
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy9
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java)20
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java39
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java89
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java69
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy32
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy33
-rw-r--r--docs/cps-ncmp-message-status-codes.rst4
-rw-r--r--docs/release-notes.rst5
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy41
-rw-r--r--integration-test/src/test/resources/application.yml6
20 files changed, 331 insertions, 100 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 3c263e3814..27bc6c6189 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -52,7 +52,7 @@ spring:
minimumIdle: 5
maximumPoolSize: 80
idleTimeout: 60000
- connectionTimeout: 120000
+ connectionTimeout: 30000
leakDetectionThreshold: 30000
pool-name: CpsDatabasePool
@@ -170,7 +170,7 @@ logging:
ncmp:
dmi:
httpclient:
- connectionTimeoutInSeconds: 180
+ connectionTimeoutInSeconds: 30
maximumConnectionsPerRoute: 50
maximumConnectionsTotal: 100
idleConnectionEvictionThresholdInSeconds: 5
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
index eca7ebfe31..1f87865f21 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
@@ -25,11 +25,13 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
import java.util.Map;
import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
import org.onap.cps.ncmp.api.impl.operations.OperationType;
+import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
import org.onap.cps.ncmp.api.models.CmResourceAddress;
import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
@@ -46,7 +48,7 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
private static final Object noReturn = null;
- private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50;
+ private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50000;
private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
@@ -99,8 +101,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
final DataOperationRequest dataOperationRequest,
final String authorization) {
final String requestId = UUID.randomUUID().toString();
- cpsNcmpTaskExecutor.executeTask(
+ cpsNcmpTaskExecutor.executeTaskWithErrorHandling(
getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization),
+ getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId),
timeOutInMilliSeconds);
return ResponseEntity.ok(Map.of("requestId", requestId));
}
@@ -139,4 +142,13 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
};
}
+ private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest(
+ final String topicParamInQuery,
+ final DataOperationRequest dataOperationRequest,
+ final String requestId) {
+ return (result, throwable) ->
+ ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery,
+ requestId, dataOperationRequest, throwable);
+ }
+
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
index ba68d5b757..2601c7a5b3 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2022-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.cps.ncmp.rest.executor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -32,15 +33,29 @@ import org.springframework.stereotype.Service;
public class CpsNcmpTaskExecutor {
/**
+ * Execute a task asynchronously, and invoke completion handler when done.
+ *
+ * @param taskSupplier functional method is get() task needed to be executed asynchronously
+ * @param taskCompletionHandler the action to perform on task completion or error
+ * @param timeOutInMillis the time-out value in milliseconds
+ */
+ public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier,
+ final BiConsumer<Object, Throwable> taskCompletionHandler,
+ final long timeOutInMillis) {
+ CompletableFuture.supplyAsync(taskSupplier)
+ .orTimeout(timeOutInMillis, MILLISECONDS)
+ .whenCompleteAsync(taskCompletionHandler);
+ }
+
+ /**
* Execute a task asynchronously.
*
- * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param taskSupplier functional method is get() task needed to be executed asynchronously
* @param timeOutInMillis the time-out value in milliseconds
*/
public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier::get)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync((taskResult, throwable) -> handleTaskCompletion(throwable));
+ executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable),
+ timeOutInMillis);
}
private void handleTaskCompletion(final Throwable throwable) {
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
index a5b1f05ee1..2d7e9b2d03 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
@@ -198,7 +198,7 @@ class NetworkCmProxyControllerSpec extends Specification {
and: 'async request id is generated'
assert response.contentAsString.contains('requestId')
then: 'the request is handled asynchronously'
- 1 * mockCpsTaskExecutor.executeTask(*_)
+ 1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_)
where: 'the following data stores are used'
datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL]
}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
index aef37c91e4..641715d0d2 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
@@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
when: 'data operation request is executed'
objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER)
then: 'the task is executed in an async fashion or not'
- expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_)
+ expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
where: 'the following parameters are used'
scenario | notificationFeatureEnabled || expectedCalls
'on' | true || 1
@@ -101,7 +101,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
when: 'data operation request is executed'
objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
then: 'the task is executed in an async fashion'
- 1 * spiedCpsNcmpTaskExecutor.executeTask(*_)
+ 1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
and: 'the network service is invoked'
new PollingConditions().within(1) {
assert networkServiceMethodCalled == true
@@ -143,14 +143,15 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
def 'Attempt to execute async data operation request with too many cm handles.'() {
given: 'a data operation definition with too many cm handles'
- def cmHandleIds = new String[51]
+ def tooMany = objectUnderTest.MAXIMUM_CM_HANDLES_PER_OPERATION+1
+ def cmHandleIds = new String[tooMany]
def dataOperationDefinition = new DataOperationDefinition(operationId: 'abc', operation: 'read', datastore: 'ncmp-datastore:passthrough-running', cmHandleIds: cmHandleIds)
when: 'data operation request is executed'
objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
then: 'a payload too large exception is thrown'
def exceptionThrown = thrown(PayloadTooLargeException)
and: 'the error message contains the offending number of cm handles'
- assert exceptionThrown.message == "Operation 'abc' affects too many (51) cm handles"
+ assert exceptionThrown.message == "Operation 'abc' affects too many (${tooMany}) cm handles"
}
}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
index 010eda964d..4c8c40f4e6 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy
@@ -33,6 +33,7 @@ class CpsNcmpTaskExecutorSpec extends Specification {
def objectUnderTest = new CpsNcmpTaskExecutor()
def logger = Spy(ListAppender<ILoggingEvent>)
def enoughTime = 100
+ def notEnoughTime = 10
void setup() {
((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger)
@@ -67,6 +68,18 @@ class CpsNcmpTaskExecutorSpec extends Specification {
assert loggingEvent.formattedMessage.contains('original exception message')
}
+ def 'Task times out.'() {
+ when: 'task is executed without enough time to complete'
+ objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime)
+ then: 'an event is logged with level ERROR'
+ new PollingConditions().within(1) {
+ def loggingEvent = getLoggingEvent()
+ assert loggingEvent.level == Level.ERROR
+ }
+ and: 'a timeout error message is logged'
+ assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException')
+ }
+
def taskSupplier() {
return () -> 'hello world'
}
@@ -75,6 +88,10 @@ class CpsNcmpTaskExecutorSpec extends Specification {
return () -> { throw new RuntimeException('original exception message') }
}
+ def taskSupplierForLongRunningTask() {
+ return () -> { sleep(enoughTime) }
+ }
+
def getLoggingEvent() {
return logger.list[0]
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java
index 544db50a55..248db9805c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpCloudEventBuilder.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/NcmpEvent.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,30 +31,32 @@ import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext;
import org.onap.cps.utils.JsonObjectMapper;
-@Builder(buildMethodName = "setCloudEvent")
-public class NcmpCloudEventBuilder {
+@Builder
+public class NcmpEvent {
- private Object event;
+ private Object data;
private Map<String, String> extensions;
private String type;
@Builder.Default
- private static final String EVENT_SPEC_VERSION_V1 = "1.0.0";
+ private static final String CLOUD_EVENT_SPEC_VERSION_V1 = "1.0.0";
+ @Builder.Default
+ private static final String CLOUD_EVENT_SOURCE = "NCMP";
/**
* Creates ncmp cloud event with provided attributes.
*
* @return Cloud Event
*/
- public CloudEvent build() {
+ public CloudEvent asCloudEvent() {
final JsonObjectMapper jsonObjectMapper = CpsApplicationContext.getCpsBean(JsonObjectMapper.class);
final CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSource(URI.create("NCMP"))
+ .withSource(URI.create(CLOUD_EVENT_SOURCE))
.withType(type)
- .withDataSchema(URI.create("urn:cps:" + type + ":" + EVENT_SPEC_VERSION_V1))
+ .withDataSchema(URI.create("urn:cps:" + type + ":" + CLOUD_EVENT_SPEC_VERSION_V1))
.withTime(EventDateTimeFormatter.toIsoOffsetDateTime(
EventDateTimeFormatter.getCurrentIsoFormattedDateTime()))
- .withData(jsonObjectMapper.asJsonBytes(event));
+ .withData(jsonObjectMapper.asJsonBytes(data));
extensions.entrySet().stream()
.filter(extensionEntry -> StringUtils.isNotBlank(extensionEntry.getValue()))
.forEach(extensionEntry ->
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java
index 9bd1119588..7afe606f4f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/ncmptoclient/AvcEventPublisher.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.onap.cps.events.EventsPublisher;
-import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder;
+import org.onap.cps.ncmp.api.impl.events.NcmpEvent;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@@ -53,8 +53,8 @@ public class AvcEventPublisher {
final Map<String, String> extensions = createAvcEventExtensions(eventKey);
final CloudEvent avcCloudEvent =
- NcmpCloudEventBuilder.builder().type(AvcEvent.class.getTypeName())
- .event(avcEvent).extensions(extensions).setCloudEvent().build();
+ NcmpEvent.builder().type(AvcEvent.class.getTypeName())
+ .data(avcEvent).extensions(extensions).build().asCloudEvent();
eventsPublisher.publishCloudEvent(avcTopic, eventKey, avcCloudEvent);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 8c1cac39dc..4b3a085147 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -119,8 +119,8 @@ public class DmiCmNotificationSubscriptionCacheHandler {
for (final String cmHandle: cmHandles) {
for (final String xpath: xpaths) {
- cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
- cmHandle, xpath, subscriptionId);
+ cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType, cmHandle,
+ xpath, subscriptionId);
}
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
index 6b02adb654..3bb40c3b7e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceService.java
@@ -31,9 +31,9 @@ public interface CmNotificationSubscriptionPersistenceService {
/**
* Check if we have an ongoing cm subscription based on the parameters.
*
- * @param datastoreType valid datastore type
- * @param cmHandleId cmhandle id
- * @param xpath valid xpath
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
* @return true for ongoing cmsubscription , otherwise false
*/
boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
@@ -50,22 +50,35 @@ public interface CmNotificationSubscriptionPersistenceService {
/**
* Get all ongoing cm notification subscription based on the parameters.
*
- * @param datastoreType valid datastore type
- * @param cmHandleId cmhandle id
- * @param xpath valid xpath
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
* @return collection of subscription ids of ongoing cm notification subscription
*/
Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
final String cmHandleId, final String xpath);
/**
- * Add or update cm notification subscription.
+ * Add cm notification subscription.
*
- * @param datastoreType valid datastore type
- * @param cmHandle cmhandle id
- * @param xpath valid xpath
- * @param newSubscriptionId subscription Id to be added
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param newSubscriptionId subscription id to be added
*/
- void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandle,
- final String xpath, final String newSubscriptionId);
+ void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String newSubscriptionId);
+
+ /**
+ * Remove cm notification Subscription.
+ *
+ * @param datastoreType the susbcription target datastore type
+ * @param cmHandleId the id of the cm handle for the susbcription
+ * @param xpath the target xpath
+ * @param subscriptionId subscription id to remove
+ */
+ void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String subscriptionId);
+
}
+
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
index 2efd321b8d..92f3459187 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImpl.java
@@ -24,7 +24,6 @@ import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
import java.io.Serializable;
import java.time.OffsetDateTime;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -60,7 +59,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
@Override
public boolean isOngoingCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath) {
+ final String xpath) {
return !getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath).isEmpty();
}
@@ -73,7 +72,7 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
@Override
public Collection<String> getOngoingCmNotificationSubscriptionIds(final DatastoreType datastoreType,
- final String cmHandleId, final String xpath) {
+ final String cmHandleId, final String xpath) {
final String isOngoingCmSubscriptionCpsPathQuery =
CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
@@ -88,45 +87,77 @@ public class CmNotificationSubscriptionPersistenceServiceImpl implements CmNotif
}
@Override
- public void addOrUpdateCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
- final String xpath, final String newSubscriptionId) {
- if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) {
- final DataNode existingFilterNode =
- cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
- CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
- escapeQuotesByDoublingThem(xpath)),
- OMIT_DESCENDANTS).iterator().next();
- final Collection<String> existingSubscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
+ public void addCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String subscriptionId) {
+ if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)
+ && (!getOngoingCmNotificationSubscriptionIds(datastoreType, cmHandleId, xpath)
+ .contains(subscriptionId))) {
+ final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath);
+ final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
cmHandleId, xpath);
- if (!existingSubscriptionIds.contains(newSubscriptionId)) {
- updateListOfSubscribers(existingSubscriptionIds, newSubscriptionId, existingFilterNode);
- }
+ subscriptionIds.add(subscriptionId);
+ saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds);
+ } else {
+ addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, subscriptionId);
+ }
+ }
+
+ @Override
+ public void removeCmNotificationSubscription(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath, final String subscriptionId) {
+ final DataNode subscriptionAsDataNode = getSubscriptionAsDataNode(datastoreType, cmHandleId, xpath);
+ final Collection<String> subscriptionIds = getOngoingCmNotificationSubscriptionIds(datastoreType,
+ cmHandleId, xpath);
+ subscriptionIds.remove(subscriptionId);
+ saveSubscriptionDetails(subscriptionAsDataNode, subscriptionIds);
+ if (isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)) {
+ log.info("There are subscribers left for the following cps path {} :",
+ CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+ escapeQuotesByDoublingThem(xpath)));
} else {
- addNewSubscriptionViaDatastore(datastoreType, cmHandleId, xpath, newSubscriptionId);
+ log.info("No subscribers left for the following cps path {} :",
+ CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+ escapeQuotesByDoublingThem(xpath)));
+ deleteListOfSubscriptionsFor(datastoreType, cmHandleId, xpath);
}
}
+ private void deleteListOfSubscriptionsFor(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath) {
+ cpsDataService.deleteDataNode(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+ escapeQuotesByDoublingThem(xpath)),
+ OffsetDateTime.now());
+ }
+
+ private DataNode getSubscriptionAsDataNode(final DatastoreType datastoreType, final String cmHandleId,
+ final String xpath) {
+ return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME,
+ CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted(datastoreType.getDatastoreName(), cmHandleId,
+ escapeQuotesByDoublingThem(xpath)),
+ OMIT_DESCENDANTS).iterator().next();
+ }
+
private void addNewSubscriptionViaDatastore(final DatastoreType datastoreType, final String cmHandleId,
final String xpath, final String newSubscriptionId) {
final String parentXpath = "/datastores/datastore[@name='%s']/cm-handles"
.formatted(datastoreType.getDatastoreName());
- final String updatedJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":"
+ final String subscriptionAsJson = String.format("{\"cm-handle\":[{\"id\":\"%s\",\"filters\":{\"filter\":"
+ "[{\"xpath\":\"%s\",\"subscriptionIds\":[\"%s\"]}]}}]}", cmHandleId, xpath, newSubscriptionId);
- cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson,
+ cpsDataService.saveData(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, subscriptionAsJson,
OffsetDateTime.now(), ContentType.JSON);
}
- private void updateListOfSubscribers(final Collection<String> existingSubscriptionIds,
- final String newSubscriptionId, final DataNode existingFilterNode) {
- final String parentXpath = CpsPathUtil.getNormalizedParentXpath(existingFilterNode.getXpath());
- final List<String> updatedSubscribers = new ArrayList<>(existingSubscriptionIds);
- updatedSubscribers.add(newSubscriptionId);
- final Map<String, Serializable> updatedLeaves = new HashMap<>();
- updatedLeaves.put("xpath", existingFilterNode.getLeaves().get("xpath"));
- updatedLeaves.put("subscriptionIds", (Serializable) updatedSubscribers);
- final String updatedJson = "{\"filter\":[" + jsonObjectMapper.asJsonString(updatedLeaves) + "]}";
- cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath, updatedJson,
- OffsetDateTime.now());
+ private void saveSubscriptionDetails(final DataNode subscriptionDetailsAsDataNode,
+ final Collection<String> subscriptionIds) {
+ final Map<String, Serializable> subscriptionDetailsAsMap = new HashMap<>();
+ subscriptionDetailsAsMap.put("xpath", subscriptionDetailsAsDataNode.getLeaves().get("xpath"));
+ subscriptionDetailsAsMap.put("subscriptionIds", (Serializable) subscriptionIds);
+ final String parentXpath = CpsPathUtil.getNormalizedParentXpath(subscriptionDetailsAsDataNode.getXpath());
+ final String subscriptionDetailsAsJson = "{\"filter\":["
+ + jsonObjectMapper.asJsonString(subscriptionDetailsAsMap).replace("'", "\"") + "]}";
+ cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, parentXpath,
+ subscriptionDetailsAsJson, OffsetDateTime.now());
}
private static String escapeQuotesByDoublingThem(final String inputXpath) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
index 61da706c59..42bad89f52 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/DataOperationEventCreator.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
-import org.onap.cps.ncmp.api.impl.events.NcmpCloudEventBuilder;
+import org.onap.cps.ncmp.api.impl.events.NcmpEvent;
import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation;
import org.onap.cps.ncmp.events.async1_0_0.Data;
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
@@ -57,8 +57,8 @@ public class DataOperationEventCreator {
final Data data = createPayloadFromDataOperationResponses(cmHandleIdsPerResponseCodesPerOperation);
dataOperationEvent.setData(data);
final Map<String, String> extensions = createDataOperationExtensions(requestId, clientTopic);
- return NcmpCloudEventBuilder.builder().type(DataOperationEvent.class.getName())
- .event(dataOperationEvent).extensions(extensions).setCloudEvent().build();
+ return NcmpEvent.builder().type(DataOperationEvent.class.getName())
+ .data(dataOperationEvent).extensions(extensions).build().asCloudEvent();
}
private static Data createPayloadFromDataOperationResponses(final MultiValueMap<DmiDataOperation,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index a8b4e286b6..4b016b37d1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils {
DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
CM_HANDLES_NOT_READY, nonReadyCmHandleIds);
}
- if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
- }
+ publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
return dmiDataOperationsOutPerDmiServiceName;
}
/**
+ * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
+ *
+ * @param topicParamInQuery client given topic
+ * @param requestId unique identifier per request
+ * @param dataOperationRequest incoming data operation request details
+ * @param throwable error cause, or null if task completed with no exception
+ */
+ public static void handleAsyncTaskCompletionForDataOperationsRequest(
+ final String topicParamInQuery,
+ final String requestId,
+ final DataOperationRequest dataOperationRequest,
+ final Throwable throwable) {
+ if (throwable == null) {
+ log.info("Data operations request {} completed.", requestId);
+ } else if (throwable instanceof TimeoutException) {
+ log.error("Data operations request {} timed out.", requestId);
+ ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+ requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
+ } else {
+ log.error("Data operations request {} failed.", requestId, throwable);
+ ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+ requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
+ }
+ }
+
+ /**
+ * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
+ *
+ * @param topicParamInQuery client given topic
+ * @param requestId unique identifier per request
+ * @param dataOperationRequestIn incoming data operation request details
+ * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations
+ */
+ private static void publishErrorMessageToClientTopicForEntireOperation(
+ final String topicParamInQuery,
+ final String requestId,
+ final DataOperationRequest dataOperationRequestIn,
+ final NcmpResponseStatus ncmpResponseStatus) {
+
+ final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
+ cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
+
+ for (final DataOperationDefinition dataOperationDefinitionIn :
+ dataOperationRequestIn.getDataOperationDefinitions()) {
+ cmHandleIdsPerResponseCodesPerOperation.add(
+ DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
+ Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
+ }
+ publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
+ }
+
+ /**
* Creates data operation cloud event and publish it to client topic.
*
* @param clientTopic client given topic
* @param requestId unique identifier per request
- * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
+ * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
*/
public static void publishErrorMessageToClientTopic(final String clientTopic,
final String requestId,
final MultiValueMap<DmiDataOperation,
Map<NcmpResponseStatus, List<String>>>
cmHandleIdsPerResponseCodesPerOperation) {
- final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
- requestId, cmHandleIdsPerResponseCodesPerOperation);
- final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
- eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
+ final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+ requestId, cmHandleIdsPerResponseCodesPerOperation);
+ final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ }
}
private static Map<String, String> getDmiServiceNamesPerCmHandleId(
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 47a1c89468..10e060fee6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -139,7 +139,7 @@ class DmiCmNotificationSubscriptionCacheHandlerSpec extends MessagingBaseSpec {
when: 'subscription is persisted in database'
objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
then: 'persistence service is called the correct number of times per dmi'
- 4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+ 4 * mockCmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(_,_,_,subscriptionId)
}
def setUpTestEvent(){
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy
index 19ebc3d711..13a20a1eb2 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/service/CmNotificationSubscriptionPersistenceServiceImplSpec.groovy
@@ -71,12 +71,12 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
def 'Add new subscriber to an ongoing cm notification subscription'() {
given: 'a valid cm subscription path query'
- def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y');
+ def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
and: 'a dataNode exists for the given cps path query'
mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1']])]
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addOrUpdateCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId')
+ objectUnderTest.addCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1','/x/y', 'newSubId')
then: 'data service method to update list of subscribers is called once'
1 * mockCpsDataService.updateNodeLeaves(
'NCMP-Admin',
@@ -95,7 +95,7 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
cpsPathQuery.formatted(datastoreName),
FetchDescendantsOption.OMIT_DESCENDANTS) >> []
when: 'the method to add/update cm notification subscription is called'
- objectUnderTest.addOrUpdateCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
+ objectUnderTest.addCmNotificationSubscription(datastoreType, 'ch-1','/x/y', 'newSubId')
then: 'data service method to update list of subscribers is called once with the correct parameters'
1 * mockCpsDataService.saveData(
'NCMP-Admin',
@@ -107,4 +107,30 @@ class CmNotificationSubscriptionPersistenceServiceImplSpec extends Specification
'passthrough_running' | DatastoreType.PASSTHROUGH_RUNNING || "ncmp-datastore:passthrough-running"
'passthrough_operational' | DatastoreType.PASSTHROUGH_OPERATIONAL || "ncmp-datastore:passthrough-operational"
}
+
+ def 'Remove subscriber from a list of an ongoing cm notification subscription'() {
+ given: 'a subscription exists when queried'
+ def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
+ cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': ['sub-1', 'sub-2']])]
+ when: 'the subscriber is removed'
+ objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+ then: 'the list of subscribers is updated'
+ 1 * mockCpsDataService.updateNodeLeaves('NCMP-Admin', 'cm-data-subscriptions',
+ '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters',
+ '{"filter":[{"xpath":"/x/y","subscriptionIds":["sub-2"]}]}', _)
+ }
+
+ def 'Removing ongoing subscription with no subscribers'(){
+ given: 'a subscription exists when queried but has no subscribers'
+ def cpsPathQuery = objectUnderTest.CM_SUBSCRIPTION_CPS_PATH_QUERY.formatted('ncmp-datastore:passthrough-running', 'ch-1', '/x/y')
+ mockCpsQueryService.queryDataNodes('NCMP-Admin', 'cm-data-subscriptions',
+ cpsPathQuery, FetchDescendantsOption.OMIT_DESCENDANTS) >> [new DataNode(xpath: cpsPathQuery, leaves: ['xpath': '/x/y','subscriptionIds': []])]
+ when: 'a an ongoing subscription is refreshed'
+ objectUnderTest.removeCmNotificationSubscription(DatastoreType.PASSTHROUGH_RUNNING, 'ch-1', '/x/y', 'sub-1')
+ then: 'the subscription with empty subscriber list is removed'
+ 1 * mockCpsDataService.deleteDataNode('NCMP-Admin', 'cm-data-subscriptions',
+ '/datastores/datastore[@name=\'ncmp-datastore:passthrough-running\']/cm-handles/cm-handle[@id=\'ch-1\']/filters/filter[@xpath=\'/x/y\']',
+ _)
+ }
} \ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
index 5690b8f214..8df27bb62c 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.NcmpResponseStatus
import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
+
import java.time.Duration
+import java.util.concurrent.TimeoutException
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
- def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'my-topic-name'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
@@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'data operation request having non-ready and non-existing cm handle ids'
def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
when: 'data operation request is processed'
ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
and: 'subscribed client specified topic is polled and first record is selected'
- def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
then: 'verify cloud compliant headers'
def consumerRecordOutHeaders = consumerRecordOut.headers()
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
@@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
and: 'data operation response event response size is 3'
dataOperationResponseEvent.data.responses.size() == 3
and: 'verify published data operation response as json string'
- def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
+ def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
}
+ def 'Publish error response for entire data operations request when async task fails'() {
+ given: 'consumer subscribing to client topic'
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
+ cloudEventKafkaConsumer.subscribe([clientTopic])
+ and: 'data operation request having non-ready and non-existing cm handle ids'
+ def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
+ def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
+ when: 'an error occurs for the entire data operations request'
+ ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
+ and: 'subscribed client specified topic is polled and first record is selected'
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
+ def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
+ then: 'data operation response event response size is 3'
+ dataOperationResponseEvent.data.responses.size() == 3
+ and: 'all 3 have the expected error code'
+ dataOperationResponseEvent.data.responses.each {
+ assert it.statusCode == errorReportedToClientTopic.code
+ }
+ where:
+ scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic
+ 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
+ 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR
+ }
+
static def getYangModelCmHandles() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
diff --git a/docs/cps-ncmp-message-status-codes.rst b/docs/cps-ncmp-message-status-codes.rst
index 018cf4a053..0c6ce0e712 100644
--- a/docs/cps-ncmp-message-status-codes.rst
+++ b/docs/cps-ncmp-message-status-codes.rst
@@ -16,7 +16,7 @@ CPS-NCMP Message Status Codes
+-----------------+------------------------------------------------------+-----------------------------------+
| 1 | successfully applied subscription | CM Data Notification Subscription |
+-----------------+------------------------------------------------------+-----------------------------------+
- | 100 | cm handle id(s) is(are) not found | Data Operation, Inventory |
+ | 100 | cm handle id(s) is(are) not found | All features |
+-----------------+------------------------------------------------------+-----------------------------------+
| 101 | cm handle(s) not ready | Data Operation |
+-----------------+------------------------------------------------------+-----------------------------------+
@@ -32,7 +32,7 @@ CPS-NCMP Message Status Codes
+-----------------+------------------------------------------------------+-----------------------------------+
| 107 | southbound system is busy | Data Operation |
+-----------------+------------------------------------------------------+-----------------------------------+
- | 108 | Unknown error | Inventory |
+ | 108 | Unknown error | All features |
+-----------------+------------------------------------------------------+-----------------------------------+
| 109 | cm-handle already exists | Inventory |
+-----------------+------------------------------------------------------+-----------------------------------+
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index d35ed99c31..f04f977c4c 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -36,6 +36,11 @@ Release Data
| | |
+--------------------------------------+--------------------------------------------------------+
+Bug Fixes
+---------
+3.4.8
+ - `CPS-2186 <https://jira.onap.org/browse/CPS-2186>`_ Report async task failures to client topic during data operations request
+
Features
--------
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy
index df74a05b50..9129f09fb5 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/NcmpCmNotificationSubscriptionSpec.groovy
@@ -18,7 +18,7 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
assert cmNotificationSubscriptionPersistenceService.
getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 0
when: 'we add a new cm notification subscription'
- cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
+ cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType,cmHandleId,xpath,
'subId-1')
then: 'there is an ongoing cm subscription for that CM handle and xpath'
assert cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType,cmHandleId,xpath)
@@ -27,15 +27,13 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 1
}
- def 'Adding a cm notification subscription to an already existing'() {
- given: 'an ongoing cm subscription'
+ def 'Adding a cm notification subscription to the already existing'() {
+ given: 'an ongoing cm subscription with the following details'
def datastoreType = PASSTHROUGH_RUNNING
def cmHandleId = 'ch-1'
def xpath = '/x/y'
- cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
- 'subId-1')
when: 'a new cm notification subscription is made for the SAME CM handle and xpath'
- cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,cmHandleId,xpath,
+ cmNotificationSubscriptionPersistenceService.addCmNotificationSubscription(datastoreType,cmHandleId,xpath,
'subId-2')
then: 'it is added to the ongoing list of subscription ids'
def subscriptionIds = cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath)
@@ -43,4 +41,35 @@ class NcmpCmNotificationSubscriptionSpec extends CpsIntegrationSpecBase {
and: 'both subscription ids exists for the CM handle and xpath'
assert subscriptionIds.contains("subId-1") && subscriptionIds.contains("subId-2")
}
+
+ def 'Removing cm notification subscriber among other subscribers'() {
+ given: 'an ongoing cm subscription with the following details'
+ def datastoreType = PASSTHROUGH_RUNNING
+ def cmHandleId = 'ch-1'
+ def xpath = '/x/y'
+ and: 'the number of subscribers is as follows'
+ def originalNumberOfSubscribers =
+ cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size()
+ when: 'a subscriber is removed'
+ cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,cmHandleId,xpath,'subId-2')
+ then: 'the number of subscribers is reduced by 1'
+ def updatedNumberOfSubscribers =
+ cmNotificationSubscriptionPersistenceService.getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size()
+ assert updatedNumberOfSubscribers == originalNumberOfSubscribers-1
+ }
+
+ def 'Removing the LAST cm notification subscriber for a given cm handle, datastore and xpath'() {
+ given: 'an ongoing cm subscription with the following details'
+ def datastoreType = PASSTHROUGH_RUNNING
+ def cmHandleId = 'ch-1'
+ def xpath = '/x/y'
+ and: 'there is only one subscriber'
+ assert cmNotificationSubscriptionPersistenceService
+ .getOngoingCmNotificationSubscriptionIds(datastoreType,cmHandleId,xpath).size() == 1
+ when: 'only subscriber is removed'
+ cmNotificationSubscriptionPersistenceService.removeCmNotificationSubscription(datastoreType,cmHandleId,xpath,'subId-1')
+ then: 'there are no longer any subscriptions for the cm handle, datastore and xpath'
+ assert !cmNotificationSubscriptionPersistenceService.isOngoingCmNotificationSubscription(datastoreType, cmHandleId, xpath)
+ }
+
}
diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml
index 3d61bdbea6..6fd3bcae4e 100644
--- a/integration-test/src/test/resources/application.yml
+++ b/integration-test/src/test/resources/application.yml
@@ -48,7 +48,7 @@ spring:
minimumIdle: 5
maximumPoolSize: 80
idleTimeout: 60000
- connectionTimeout: 120000
+ connectionTimeout: 30000
leakDetectionThreshold: 30000
pool-name: CpsDatabasePool
@@ -120,7 +120,7 @@ notification:
queue-capacity: 500
wait-for-tasks-to-complete-on-shutdown: true
thread-name-prefix: Async-
- time-out-value-in-ms: 2000
+ time-out-value-in-ms: 60000
springdoc:
swagger-ui:
@@ -165,7 +165,7 @@ logging:
ncmp:
dmi:
httpclient:
- connectionTimeoutInSeconds: 180
+ connectionTimeoutInSeconds: 30
maximumConnectionsPerRoute: 50
maximumConnectionsTotal: 100
idleConnectionEvictionThresholdInSeconds: 5