From 04dbe3800a0a9f9809cff2da59a31904a26f17ce Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Fri, 7 Jun 2024 19:10:45 +0100 Subject: #1: Used async version of web client for batch read operation - Exposed async version of post method into dmi rest client. - Code change is done to use async web client for batch data operation. - Use of CpsNcmpTaskExecutor code is removed. Issue-ID: CPS-2174 Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72 Signed-off-by: sourabh_sourabh --- .../impl/operations/DmiDataOperationsSpec.groovy | 20 ++++---- .../ResourceDataOperationRequestUtilsSpec.groovy | 58 ++++++++-------------- 2 files changed, 31 insertions(+), 47 deletions(-) (limited to 'cps-ncmp-service/src/test') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy index ad3f85c84..a861809c6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy @@ -21,6 +21,8 @@ package org.onap.cps.ncmp.api.impl.operations +import reactor.core.publisher.Mono + import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING @@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] and: 'a positive response from DMI service when it is called with valid request parameters' - def responseFromDmi = new ResponseEntity(HttpStatus.ACCEPTED) + def responseFromDmi = Mono.just(new ResponseEntity(HttpStatus.ACCEPTED)) def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name" def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}' - mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi - when: 'get resource data for group of cm handles are invoked' + mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi + when: 'get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER) - then: 'the post operation was called and ncmp generated dmi request body json args' - 1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) + then: 'the post operation was called with the expected URL and JSON request body' + 1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) } def 'Execute (async) data operation from DMI service with Exception.'() { @@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] - and: 'the published cloud will be captured' + and: 'the published cloud event will be captured' def actualDataOperationCloudEvent = null eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } - and: 'a positive response from DMI service when it is called with valid request parameters' - mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) } - when: 'attempt tp get resource data for group of cm handles are invoked' + and: 'a DMI client request exception is thrown when DMI service is called' + mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) } + when: 'attempt to get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER) then: 'the event contains the expected error details' def eventDataValue = extractDataValue(actualDataOperationCloudEvent) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy index 9028b9e5e..653068592 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy @@ -20,16 +20,20 @@ package org.onap.cps.ncmp.api.impl.utils.data.operation +import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY + import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsPublisher -import org.onap.cps.ncmp.api.NcmpResponseStatus +import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation +import org.onap.cps.ncmp.api.impl.operations.OperationType import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.impl.inventory.CmHandleState import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.api.models.DataOperationRequest @@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration - +import org.springframework.util.LinkedMultiValueMap import java.time.Duration -import java.util.concurrent.TimeoutException - -import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent -@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper]) +@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext]) class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { def static clientTopic = 'my-topic-name' @@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { @SpringBean EventsPublisher eventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - @Autowired - ObjectMapper objectMapper - def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') @@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } - def 'Publish error response for entire data operations request when async task fails'() { - given: 'consumer subscribing to client topic' - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer)) - cloudEventKafkaConsumer.subscribe([clientTopic]) - and: 'data operation request having non-ready and non-existing cm handle ids' - def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') - def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class) - when: 'an error occurs for the entire data operations request' - ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown) - and: 'subscribed client specified topic is polled and first record is selected' - def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() - def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) - then: 'data operation response event response size is 3' - dataOperationResponseEvent.data.responses.size() == 3 - and: 'all 3 have the expected error code' - dataOperationResponseEvent.data.responses.each { - assert it.statusCode == errorReportedToClientTopic.code - } - where: - scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic - 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING - 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR - } - static def getYangModelCmHandles() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() - def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() + def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), @@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { static def getYangModelCmHandlesForOneCmHandle() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)] } + + def mockAndPopulateErrorMap(errorReportedToClientTopic) { + def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read')) + .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational') + .options('some-option').resourceIdentifier('some-resource-identifier').build() + def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>() + cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id'])) + return cmHandleIdsPerResponseCodesPerOperation + } } -- cgit 1.2.3-korg