summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java69
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy33
2 files changed, 91 insertions, 11 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index a8b4e286b6..4b016b37d1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils {
DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
CM_HANDLES_NOT_READY, nonReadyCmHandleIds);
}
- if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
- }
+ publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
return dmiDataOperationsOutPerDmiServiceName;
}
/**
+ * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
+ *
+ * @param topicParamInQuery client given topic
+ * @param requestId unique identifier per request
+ * @param dataOperationRequest incoming data operation request details
+ * @param throwable error cause, or null if task completed with no exception
+ */
+ public static void handleAsyncTaskCompletionForDataOperationsRequest(
+ final String topicParamInQuery,
+ final String requestId,
+ final DataOperationRequest dataOperationRequest,
+ final Throwable throwable) {
+ if (throwable == null) {
+ log.info("Data operations request {} completed.", requestId);
+ } else if (throwable instanceof TimeoutException) {
+ log.error("Data operations request {} timed out.", requestId);
+ ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+ requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
+ } else {
+ log.error("Data operations request {} failed.", requestId, throwable);
+ ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
+ requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
+ }
+ }
+
+ /**
+ * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
+ *
+ * @param topicParamInQuery client given topic
+ * @param requestId unique identifier per request
+ * @param dataOperationRequestIn incoming data operation request details
+ * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations
+ */
+ private static void publishErrorMessageToClientTopicForEntireOperation(
+ final String topicParamInQuery,
+ final String requestId,
+ final DataOperationRequest dataOperationRequestIn,
+ final NcmpResponseStatus ncmpResponseStatus) {
+
+ final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
+ cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
+
+ for (final DataOperationDefinition dataOperationDefinitionIn :
+ dataOperationRequestIn.getDataOperationDefinitions()) {
+ cmHandleIdsPerResponseCodesPerOperation.add(
+ DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
+ Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
+ }
+ publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
+ }
+
+ /**
* Creates data operation cloud event and publish it to client topic.
*
* @param clientTopic client given topic
* @param requestId unique identifier per request
- * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
+ * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
*/
public static void publishErrorMessageToClientTopic(final String clientTopic,
final String requestId,
final MultiValueMap<DmiDataOperation,
Map<NcmpResponseStatus, List<String>>>
cmHandleIdsPerResponseCodesPerOperation) {
- final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
- requestId, cmHandleIdsPerResponseCodesPerOperation);
- final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
- eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
+ final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
+ requestId, cmHandleIdsPerResponseCodesPerOperation);
+ final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ }
}
private static Map<String, String> getDmiServiceNamesPerCmHandleId(
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
index 5690b8f214..8df27bb62c 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
@@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.events.EventsPublisher
+import org.onap.cps.ncmp.api.NcmpResponseStatus
import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
@@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
+
import java.time.Duration
+import java.util.concurrent.TimeoutException
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
- def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'my-topic-name'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
@@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'data operation request having non-ready and non-existing cm handle ids'
def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
when: 'data operation request is processed'
ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles)
and: 'subscribed client specified topic is polled and first record is selected'
- def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
then: 'verify cloud compliant headers'
def consumerRecordOutHeaders = consumerRecordOut.headers()
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null
@@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
and: 'data operation response event response size is 3'
dataOperationResponseEvent.data.responses.size() == 3
and: 'verify published data operation response as json string'
- def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
+ def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
}
+ def 'Publish error response for entire data operations request when async task fails'() {
+ given: 'consumer subscribing to client topic'
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
+ cloudEventKafkaConsumer.subscribe([clientTopic])
+ and: 'data operation request having non-ready and non-existing cm handle ids'
+ def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
+ def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
+ when: 'an error occurs for the entire data operations request'
+ ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
+ and: 'subscribed client specified topic is polled and first record is selected'
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
+ def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
+ then: 'data operation response event response size is 3'
+ dataOperationResponseEvent.data.responses.size() == 3
+ and: 'all 3 have the expected error code'
+ dataOperationResponseEvent.data.responses.each {
+ assert it.statusCode == errorReportedToClientTopic.code
+ }
+ where:
+ scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic
+ 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
+ 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR
+ }
+
static def getYangModelCmHandles() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()