diff options
author | sourabh_sourabh <sourabh.sourabh@est.tech> | 2024-06-07 19:10:45 +0100 |
---|---|---|
committer | sourabh_sourabh <sourabh.sourabh@est.tech> | 2024-06-13 10:34:42 +0100 |
commit | 04dbe3800a0a9f9809cff2da59a31904a26f17ce (patch) | |
tree | eae0353400a41c1ddd603c1ce5799f739c4d5233 /cps-ncmp-service | |
parent | d3e64201a957ca4a1538ea0962c3e5218a5d34e8 (diff) |
#1: Used async version of web client for batch read operation
- Exposed async version of post method into dmi rest client.
- Code change is done to use async web client for batch data
operation.
- Use of CpsNcmpTaskExecutor code is removed.
Issue-ID: CPS-2174
Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Diffstat (limited to 'cps-ncmp-service')
7 files changed, 105 insertions, 134 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index 6aa09767be..17b3d7ab1e 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final DataOperationRequest dataOperationRequest, final String requestId, final String authorization) { - dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, - authorization); + dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization); } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java index 7878c5d0ba..5811cf97da 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java @@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; @Component @RequiredArgsConstructor @@ -85,23 +86,41 @@ public class DmiRestClient { final String requestBodyAsJsonString, final OperationType operationType, final String authorization) { - final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA) - ? dataServicesWebClient : modelServicesWebClient; try { - return webClient.post() - .uri(toUri(dmiUrl)) - .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) - .body(BodyInserters.fromValue(requestBodyAsJsonString)) - .retrieve() - .toEntity(Object.class) - .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName())) - .block(); + return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType, + authorization).block(); } catch (final HttpServerErrorException e) { throw handleDmiClientException(e, operationType.getOperationName()); } } /** + * Asynchronously performs an HTTP POST operation with the given JSON data. + * + * @param requiredDmiService The service object required for retrieving or configuring the WebClient. + * @param dmiUrl The URL to which the POST request is sent. + * @param requestBodyAsJsonString The JSON string that will be sent as the request body. + * @param operationType An enumeration or object that holds information about the type of operation + * being performed. + * @param authorization The authorization token to be added to the request headers. + * @return A Mono emitting the response entity containing the server's response. + */ + public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService, + final String dmiUrl, + final String requestBodyAsJsonString, + final OperationType operationType, + final String authorization) { + final WebClient webClient = getWebClient(requiredDmiService); + return webClient.post() + .uri(toUri(dmiUrl)) + .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) + .body(BodyInserters.fromValue(requestBodyAsJsonString)) + .retrieve() + .toEntity(Object.class) + .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName())); + } + + /** * Get DMI plugin health status. * * @param dmiUrl the base URL of the dmi-plugin @@ -123,6 +142,10 @@ public class DmiRestClient { } } + private WebClient getWebClient(final RequiredDmiService requiredDmiService) { + return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient; + } + private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) { if (dmiProperties.isDmiBasicAuthEnabled()) { httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java index 2c0b702627..08885a9e04 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java @@ -106,13 +106,11 @@ public class DmiWebClientConfiguration { final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName, maximumConnectionsTotal); - final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider) + return HttpClient.create(dmiWebClientConnectionProvider) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000) .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds, TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds, TimeUnit.SECONDS))); - httpClient.warmup().block(); - return httpClient; } private static WebClient buildAndGetWebClient(final HttpClient httpClient, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java index 978855569a..786160a964 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.impl.client.DmiRestClient; import org.onap.cps.ncmp.api.impl.config.DmiProperties; @@ -51,13 +50,14 @@ import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Operations class for DMI data. */ @RequiredArgsConstructor @Service -@Slf4j public class DmiDataOperations { private final InventoryPersistence inventoryPersistence; @@ -231,28 +231,46 @@ public class DmiDataOperations { groupsOutPerDmiServiceName, final String authorization) { - groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> { - final String dmiUrl = DmiServiceUrlBuilder.newInstance() + Flux.fromIterable(groupsOutPerDmiServiceName.entrySet()) + .flatMap(dmiDataOperationsByDmiServiceName -> { + final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey(); + final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery); + final List<DmiDataOperation> dmiDataOperationRequestBodies + = dmiDataOperationsByDmiServiceName.getValue(); + return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); + }) + .subscribe(); + } + + private String buildDmiServiceUrl(final String dmiServiceName, final String requestId, + final String topicParamInQuery) { + return DmiServiceUrlBuilder.newInstance() .pathSegment("data") .queryParameter("requestId", requestId) .queryParameter("topic", topicParamInQuery) .build(dmiServiceName, dmiProperties.getDmiBasePath()); - sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); - }); } - private void sendDataOperationRequestToDmiService(final String dmiUrl, - final List<DmiDataOperation> dmiDataOperationRequestBodies, - final String authorization) { + private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl, + final List<DmiDataOperation> dmiDataOperationRequestBodies, + final String authorization) { + final String dmiDataOperationRequestAsJsonString + = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies); + return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, + READ, authorization) + .then() + .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> { + handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies); + return Mono.empty(); + }); + } + + private String createDmiDataOperationRequestAsJsonString( + final List<DmiDataOperation> dmiDataOperationRequestBodies) { final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder() - .operations(dmiDataOperationRequestBodies).build(); - final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest); - try { - dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ, - authorization); - } catch (final DmiClientRequestException e) { - handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies); - } + .operations(dmiDataOperationRequestBodies) + .build(); + return jsonObjectMapper.asJsonString(dmiDataOperationRequest); } private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException, @@ -275,4 +293,4 @@ public class DmiDataOperations { ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId, cmHandleIdsPerResponseCodesPerOperation); } -} +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index dc4108cac0..407fcf034e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -@Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j public class ResourceDataOperationRequestUtils { private static final String UNKNOWN_SERVICE_NAME = null; @@ -126,58 +125,6 @@ public class ResourceDataOperationRequestUtils { } /** - * Handles the async task completion for an entire data, publishing errors to client topic on task failure. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequest incoming data operation request details - * @param throwable error cause, or null if task completed with no exception - */ - public static void handleAsyncTaskCompletionForDataOperationsRequest( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequest, - final Throwable throwable) { - if (throwable == null) { - log.info("Data operations request {} completed.", requestId); - } else if (throwable instanceof TimeoutException) { - log.error("Data operations request {} timed out.", requestId); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING); - } else { - log.error("Data operations request {} failed.", requestId, throwable); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR); - } - } - - /** - * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequestIn incoming data operation request details - * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations - */ - private static void publishErrorMessageToClientTopicForEntireOperation( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequestIn, - final NcmpResponseStatus ncmpResponseStatus) { - - final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> - cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>(); - - for (final DataOperationDefinition dataOperationDefinitionIn : - dataOperationRequestIn.getDataOperationDefinitions()) { - cmHandleIdsPerResponseCodesPerOperation.add( - DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), - Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds())); - } - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); - } - - /** * Creates data operation cloud event and publish it to client topic. * * @param clientTopic client given topic @@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); + log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", + clientTopic, requestId, dataOperationCloudEvent.getId()); eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy index ad3f85c84a..a861809c64 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy @@ -21,6 +21,8 @@ package org.onap.cps.ncmp.api.impl.operations +import reactor.core.publisher.Mono + import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING @@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] and: 'a positive response from DMI service when it is called with valid request parameters' - def responseFromDmi = new ResponseEntity<Object>(HttpStatus.ACCEPTED) + def responseFromDmi = Mono.just(new ResponseEntity<Object>(HttpStatus.ACCEPTED)) def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name" def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}' - mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi - when: 'get resource data for group of cm handles are invoked' + mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi + when: 'get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER) - then: 'the post operation was called and ncmp generated dmi request body json args' - 1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) + then: 'the post operation was called with the expected URL and JSON request body' + 1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) } def 'Execute (async) data operation from DMI service with Exception.'() { @@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] - and: 'the published cloud will be captured' + and: 'the published cloud event will be captured' def actualDataOperationCloudEvent = null eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } - and: 'a positive response from DMI service when it is called with valid request parameters' - mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) } - when: 'attempt tp get resource data for group of cm handles are invoked' + and: 'a DMI client request exception is thrown when DMI service is called' + mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) } + when: 'attempt to get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER) then: 'the event contains the expected error details' def eventDataValue = extractDataValue(actualDataOperationCloudEvent) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy index 9028b9e5ed..6530685927 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy @@ -20,16 +20,20 @@ package org.onap.cps.ncmp.api.impl.utils.data.operation +import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY + import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsPublisher -import org.onap.cps.ncmp.api.NcmpResponseStatus +import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation +import org.onap.cps.ncmp.api.impl.operations.OperationType import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.impl.inventory.CmHandleState import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.api.models.DataOperationRequest @@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration - +import org.springframework.util.LinkedMultiValueMap import java.time.Duration -import java.util.concurrent.TimeoutException - -import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent -@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper]) +@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext]) class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { def static clientTopic = 'my-topic-name' @@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { @SpringBean EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - @Autowired - ObjectMapper objectMapper - def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') @@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } - def 'Publish error response for entire data operations request when async task fails'() { - given: 'consumer subscribing to client topic' - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer)) - cloudEventKafkaConsumer.subscribe([clientTopic]) - and: 'data operation request having non-ready and non-existing cm handle ids' - def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') - def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class) - when: 'an error occurs for the entire data operations request' - ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown) - and: 'subscribed client specified topic is polled and first record is selected' - def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() - def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) - then: 'data operation response event response size is 3' - dataOperationResponseEvent.data.responses.size() == 3 - and: 'all 3 have the expected error code' - dataOperationResponseEvent.data.responses.each { - assert it.statusCode == errorReportedToClientTopic.code - } - where: - scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic - 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING - 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR - } - static def getYangModelCmHandles() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() - def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() + def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), @@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { static def getYangModelCmHandlesForOneCmHandle() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)] } + + def mockAndPopulateErrorMap(errorReportedToClientTopic) { + def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read')) + .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational') + .options('some-option').resourceIdentifier('some-resource-identifier').build() + def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>() + cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id'])) + return cmHandleIdsPerResponseCodesPerOperation + } } |