diff options
Diffstat (limited to 'cps-ncmp-service/src')
2 files changed, 91 insertions, 11 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index a8b4e286b6..4b016b37d1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -109,28 +110,80 @@ public class ResourceDataOperationRequestUtils { DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), CM_HANDLES_NOT_READY, nonReadyCmHandleIds); } - if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); - } + publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); return dmiDataOperationsOutPerDmiServiceName; } /** + * Handles the async task completion for an entire data, publishing errors to client topic on task failure. + * + * @param topicParamInQuery client given topic + * @param requestId unique identifier per request + * @param dataOperationRequest incoming data operation request details + * @param throwable error cause, or null if task completed with no exception + */ + public static void handleAsyncTaskCompletionForDataOperationsRequest( + final String topicParamInQuery, + final String requestId, + final DataOperationRequest dataOperationRequest, + final Throwable throwable) { + if (throwable == null) { + log.info("Data operations request {} completed.", requestId); + } else if (throwable instanceof TimeoutException) { + log.error("Data operations request {} timed out.", requestId); + ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, + requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING); + } else { + log.error("Data operations request {} failed.", requestId, throwable); + ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, + requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR); + } + } + + /** + * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic. + * + * @param topicParamInQuery client given topic + * @param requestId unique identifier per request + * @param dataOperationRequestIn incoming data operation request details + * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations + */ + private static void publishErrorMessageToClientTopicForEntireOperation( + final String topicParamInQuery, + final String requestId, + final DataOperationRequest dataOperationRequestIn, + final NcmpResponseStatus ncmpResponseStatus) { + + final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> + cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>(); + + for (final DataOperationDefinition dataOperationDefinitionIn : + dataOperationRequestIn.getDataOperationDefinitions()) { + cmHandleIdsPerResponseCodesPerOperation.add( + DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), + Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds())); + } + publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); + } + + /** * Creates data operation cloud event and publish it to client topic. * * @param clientTopic client given topic * @param requestId unique identifier per request - * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code + * @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code */ public static void publishErrorMessageToClientTopic(final String clientTopic, final String requestId, final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> cmHandleIdsPerResponseCodesPerOperation) { - final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, - requestId, cmHandleIdsPerResponseCodesPerOperation); - final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); - eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { + final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, + requestId, cmHandleIdsPerResponseCodesPerOperation); + final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); + eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + } } private static Map<String, String> getDmiServiceNamesPerCmHandleId( diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy index 5690b8f214..8df27bb62c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy @@ -26,6 +26,7 @@ import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsPublisher +import org.onap.cps.ncmp.api.NcmpResponseStatus import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.impl.inventory.CmHandleState @@ -38,14 +39,15 @@ import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration + import java.time.Duration +import java.util.concurrent.TimeoutException import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper]) class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer)) def static clientTopic = 'my-topic-name' def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' @@ -90,6 +92,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() { given: 'consumer subscribing to client topic' + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer)) cloudEventKafkaConsumer.subscribe([clientTopic]) and: 'data operation request having non-ready and non-existing cm handle ids' def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') @@ -97,7 +100,7 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { when: 'data operation request is processed' ResourceDataOperationRequestUtils.processPerDefinitionInDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, yangModelCmHandles) and: 'subscribed client specified topic is polled and first record is selected' - def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() then: 'verify cloud compliant headers' def consumerRecordOutHeaders = consumerRecordOut.headers() assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') != null @@ -111,10 +114,34 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { and: 'data operation response event response size is 3' dataOperationResponseEvent.data.responses.size() == 3 and: 'verify published data operation response as json string' - def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json') + def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json') jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } + def 'Publish error response for entire data operations request when async task fails'() { + given: 'consumer subscribing to client topic' + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer)) + cloudEventKafkaConsumer.subscribe([clientTopic]) + and: 'data operation request having non-ready and non-existing cm handle ids' + def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') + def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class) + when: 'an error occurs for the entire data operations request' + ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown) + and: 'subscribed client specified topic is polled and first record is selected' + def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() + def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) + then: 'data operation response event response size is 3' + dataOperationResponseEvent.data.responses.size() == 3 + and: 'all 3 have the expected error code' + dataOperationResponseEvent.data.responses.each { + assert it.statusCode == errorReportedToClientTopic.code + } + where: + scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic + 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING + 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR + } + static def getYangModelCmHandles() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() |