From 04dbe3800a0a9f9809cff2da59a31904a26f17ce Mon Sep 17 00:00:00 2001 From: sourabh_sourabh Date: Fri, 7 Jun 2024 19:10:45 +0100 Subject: #1: Used async version of web client for batch read operation - Exposed async version of post method into dmi rest client. - Code change is done to use async web client for batch data operation. - Use of CpsNcmpTaskExecutor code is removed. Issue-ID: CPS-2174 Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72 Signed-off-by: sourabh_sourabh --- .../NcmpPassthroughResourceRequestHandler.java | 43 +++------------- .../controller/NetworkCmProxyControllerSpec.groovy | 39 +++++++-------- .../NcmpDatastoreRequestHandlerSpec.groovy | 11 ++-- .../api/impl/NetworkCmProxyDataServiceImpl.java | 3 +- .../cps/ncmp/api/impl/client/DmiRestClient.java | 43 ++++++++++++---- .../api/impl/config/DmiWebClientConfiguration.java | 4 +- .../api/impl/operations/DmiDataOperations.java | 54 +++++++++++++------- .../ResourceDataOperationRequestUtils.java | 57 ++------------------- .../impl/operations/DmiDataOperationsSpec.groovy | 20 ++++---- .../ResourceDataOperationRequestUtilsSpec.groovy | 58 ++++++++-------------- 10 files changed, 136 insertions(+), 196 deletions(-) diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java index be5b93c47..d716877d5 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java @@ -25,13 +25,11 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ; import java.util.Map; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Supplier; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException; import org.onap.cps.ncmp.api.impl.operations.DatastoreType; import org.onap.cps.ncmp.api.impl.operations.OperationType; -import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils; import org.onap.cps.ncmp.api.models.CmResourceAddress; import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException; @@ -45,11 +43,7 @@ import org.springframework.stereotype.Component; public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler { private final NetworkCmProxyDataService networkCmProxyDataService; - - private static final Object noReturn = null; - private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 200; - private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles"; /** @@ -101,10 +95,8 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH final DataOperationRequest dataOperationRequest, final String authorization) { final String requestId = UUID.randomUUID().toString(); - cpsNcmpTaskExecutor.executeTaskWithErrorHandling( - getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization), - getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId), - timeOutInMilliSeconds); + networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery, dataOperationRequest, requestId, + authorization); return ResponseEntity.ok(Map.of("requestId", requestId)); } @@ -114,41 +106,18 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH dataOperationRequest.getDataOperationDefinitions().forEach(dataOperationDetail -> { if (OperationType.fromOperationName(dataOperationDetail.getOperation()) != READ) { throw new OperationNotSupportedException( - dataOperationDetail.getOperation() + " operation not yet supported"); + dataOperationDetail.getOperation() + " operation not yet supported"); } if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) { throw new InvalidDatastoreException(dataOperationDetail.getDatastore() - + " datastore is not supported"); + + " datastore is not supported"); } if (dataOperationDetail.getCmHandleIds().size() > MAXIMUM_CM_HANDLES_PER_OPERATION) { final String errorMessage = String.format(PAYLOAD_TOO_LARGE_TEMPLATE, - dataOperationDetail.getOperationId(), - dataOperationDetail.getCmHandleIds().size()); + dataOperationDetail.getOperationId(), + dataOperationDetail.getCmHandleIds().size()); throw new PayloadTooLargeException(errorMessage); } }); } - - private Supplier getTaskSupplierForDataOperationRequest(final String topicParamInQuery, - final DataOperationRequest dataOperationRequest, - final String requestId, - final String authorization) { - return () -> { - networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery, - dataOperationRequest, - requestId, - authorization); - return noReturn; - }; - } - - private static BiConsumer getTaskCompletionHandlerForDataOperationRequest( - final String topicParamInQuery, - final DataOperationRequest dataOperationRequest, - final String requestId) { - return (result, throwable) -> - ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery, - requestId, dataOperationRequest, throwable); - } - } diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index 2d7e9b2d0..c9dbc291c 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -23,6 +23,23 @@ package org.onap.cps.ncmp.rest.controller +import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores +import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL +import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING +import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE +import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE +import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH +import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE +import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS +import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put + import ch.qos.logback.classic.Level import ch.qos.logback.classic.Logger import ch.qos.logback.classic.spi.ILoggingEvent @@ -62,28 +79,10 @@ import org.springframework.http.MediaType import org.springframework.test.web.servlet.MockMvc import spock.lang.Shared import spock.lang.Specification - import java.time.OffsetDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter -import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores -import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational -import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL -import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL -import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING -import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE -import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE -import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH -import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE -import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS -import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put - @WebMvcTest(NetworkCmProxyController) class NetworkCmProxyControllerSpec extends Specification { @@ -197,8 +196,8 @@ class NetworkCmProxyControllerSpec extends Specification { assert response.status == HttpStatus.OK.value() and: 'async request id is generated' assert response.contentAsString.contains('requestId') - then: 'the request is handled asynchronously' - 1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_) + then: 'the request for (async) data operation invoked once' + 1 * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('my-topic-name', _, _, null) where: 'the following data stores are used' datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL] } diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy index 641715d0d..8835c99fb 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy @@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { when: 'data operation request is executed' objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER) then: 'the task is executed in an async fashion or not' - expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_) + expectedCalls * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('someTopic', _, _, null) where: 'the following parameters are used' scenario | notificationFeatureEnabled || expectedCalls 'on' | true || 1 @@ -99,10 +99,11 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { networkServiceMethodCalled = true } when: 'data operation request is executed' - objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER) - then: 'the task is executed in an async fashion' - 1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_) - and: 'the network service is invoked' + def response = objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER) + and: 'a successful response with request id is returned' + assert response.statusCode.value == 200 + assert response.body.requestId != null + then: 'the network service is invoked' new PollingConditions().within(1) { assert networkServiceMethodCalled == true } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index 6aa09767b..17b3d7ab1 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final DataOperationRequest dataOperationRequest, final String requestId, final String authorization) { - dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, - authorization); + dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization); } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java index 7878c5d0b..5811cf97d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java @@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; @Component @RequiredArgsConstructor @@ -85,22 +86,40 @@ public class DmiRestClient { final String requestBodyAsJsonString, final OperationType operationType, final String authorization) { - final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA) - ? dataServicesWebClient : modelServicesWebClient; try { - return webClient.post() - .uri(toUri(dmiUrl)) - .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) - .body(BodyInserters.fromValue(requestBodyAsJsonString)) - .retrieve() - .toEntity(Object.class) - .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName())) - .block(); + return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType, + authorization).block(); } catch (final HttpServerErrorException e) { throw handleDmiClientException(e, operationType.getOperationName()); } } + /** + * Asynchronously performs an HTTP POST operation with the given JSON data. + * + * @param requiredDmiService The service object required for retrieving or configuring the WebClient. + * @param dmiUrl The URL to which the POST request is sent. + * @param requestBodyAsJsonString The JSON string that will be sent as the request body. + * @param operationType An enumeration or object that holds information about the type of operation + * being performed. + * @param authorization The authorization token to be added to the request headers. + * @return A Mono emitting the response entity containing the server's response. + */ + public Mono> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService, + final String dmiUrl, + final String requestBodyAsJsonString, + final OperationType operationType, + final String authorization) { + final WebClient webClient = getWebClient(requiredDmiService); + return webClient.post() + .uri(toUri(dmiUrl)) + .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) + .body(BodyInserters.fromValue(requestBodyAsJsonString)) + .retrieve() + .toEntity(Object.class) + .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName())); + } + /** * Get DMI plugin health status. * @@ -123,6 +142,10 @@ public class DmiRestClient { } } + private WebClient getWebClient(final RequiredDmiService requiredDmiService) { + return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient; + } + private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) { if (dmiProperties.isDmiBasicAuthEnabled()) { httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java index 2c0b70262..08885a9e0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java @@ -106,13 +106,11 @@ public class DmiWebClientConfiguration { final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName, maximumConnectionsTotal); - final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider) + return HttpClient.create(dmiWebClientConnectionProvider) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000) .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds, TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds, TimeUnit.SECONDS))); - httpClient.warmup().block(); - return httpClient; } private static WebClient buildAndGetWebClient(final HttpClient httpClient, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java index 978855569..786160a96 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.impl.client.DmiRestClient; import org.onap.cps.ncmp.api.impl.config.DmiProperties; @@ -51,13 +50,14 @@ import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Operations class for DMI data. */ @RequiredArgsConstructor @Service -@Slf4j public class DmiDataOperations { private final InventoryPersistence inventoryPersistence; @@ -231,28 +231,46 @@ public class DmiDataOperations { groupsOutPerDmiServiceName, final String authorization) { - groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> { - final String dmiUrl = DmiServiceUrlBuilder.newInstance() + Flux.fromIterable(groupsOutPerDmiServiceName.entrySet()) + .flatMap(dmiDataOperationsByDmiServiceName -> { + final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey(); + final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery); + final List dmiDataOperationRequestBodies + = dmiDataOperationsByDmiServiceName.getValue(); + return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); + }) + .subscribe(); + } + + private String buildDmiServiceUrl(final String dmiServiceName, final String requestId, + final String topicParamInQuery) { + return DmiServiceUrlBuilder.newInstance() .pathSegment("data") .queryParameter("requestId", requestId) .queryParameter("topic", topicParamInQuery) .build(dmiServiceName, dmiProperties.getDmiBasePath()); - sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); - }); } - private void sendDataOperationRequestToDmiService(final String dmiUrl, - final List dmiDataOperationRequestBodies, - final String authorization) { + private Mono sendDataOperationRequestToDmiService(final String dmiUrl, + final List dmiDataOperationRequestBodies, + final String authorization) { + final String dmiDataOperationRequestAsJsonString + = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies); + return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, + READ, authorization) + .then() + .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> { + handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies); + return Mono.empty(); + }); + } + + private String createDmiDataOperationRequestAsJsonString( + final List dmiDataOperationRequestBodies) { final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder() - .operations(dmiDataOperationRequestBodies).build(); - final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest); - try { - dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ, - authorization); - } catch (final DmiClientRequestException e) { - handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies); - } + .operations(dmiDataOperationRequestBodies) + .build(); + return jsonObjectMapper.asJsonString(dmiDataOperationRequest); } private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException, @@ -275,4 +293,4 @@ public class DmiDataOperations { ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId, cmHandleIdsPerResponseCodesPerOperation); } -} +} \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index dc4108cac..407fcf034 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -@Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j public class ResourceDataOperationRequestUtils { private static final String UNKNOWN_SERVICE_NAME = null; @@ -125,58 +124,6 @@ public class ResourceDataOperationRequestUtils { return moduleSetTagPerCmHandle; } - /** - * Handles the async task completion for an entire data, publishing errors to client topic on task failure. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequest incoming data operation request details - * @param throwable error cause, or null if task completed with no exception - */ - public static void handleAsyncTaskCompletionForDataOperationsRequest( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequest, - final Throwable throwable) { - if (throwable == null) { - log.info("Data operations request {} completed.", requestId); - } else if (throwable instanceof TimeoutException) { - log.error("Data operations request {} timed out.", requestId); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING); - } else { - log.error("Data operations request {} failed.", requestId, throwable); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR); - } - } - - /** - * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequestIn incoming data operation request details - * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations - */ - private static void publishErrorMessageToClientTopicForEntireOperation( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequestIn, - final NcmpResponseStatus ncmpResponseStatus) { - - final MultiValueMap>> - cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>(); - - for (final DataOperationDefinition dataOperationDefinitionIn : - dataOperationRequestIn.getDataOperationDefinitions()) { - cmHandleIdsPerResponseCodesPerOperation.add( - DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), - Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds())); - } - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); - } - /** * Creates data operation cloud event and publish it to client topic. * @@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); final EventsPublisher eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); + log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", + clientTopic, requestId, dataOperationCloudEvent.getId()); eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy index ad3f85c84..a861809c6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy @@ -21,6 +21,8 @@ package org.onap.cps.ncmp.api.impl.operations +import reactor.core.publisher.Mono + import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING @@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] and: 'a positive response from DMI service when it is called with valid request parameters' - def responseFromDmi = new ResponseEntity(HttpStatus.ACCEPTED) + def responseFromDmi = Mono.just(new ResponseEntity(HttpStatus.ACCEPTED)) def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name" def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}' - mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi - when: 'get resource data for group of cm handles are invoked' + mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi + when: 'get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER) - then: 'the post operation was called and ncmp generated dmi request body json args' - 1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) + then: 'the post operation was called with the expected URL and JSON request body' + 1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER) } def 'Execute (async) data operation from DMI service with Exception.'() { @@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId] - and: 'the published cloud will be captured' + and: 'the published cloud event will be captured' def actualDataOperationCloudEvent = null eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } - and: 'a positive response from DMI service when it is called with valid request parameters' - mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) } - when: 'attempt tp get resource data for group of cm handles are invoked' + and: 'a DMI client request exception is thrown when DMI service is called' + mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) } + when: 'attempt to get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER) then: 'the event contains the expected error details' def eventDataValue = extractDataValue(actualDataOperationCloudEvent) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy index 9028b9e5e..653068592 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy @@ -20,16 +20,20 @@ package org.onap.cps.ncmp.api.impl.utils.data.operation +import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED +import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY + import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.cps.events.EventsPublisher -import org.onap.cps.ncmp.api.NcmpResponseStatus +import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation +import org.onap.cps.ncmp.api.impl.operations.OperationType import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle -import org.onap.cps.ncmp.api.impl.inventory.CmHandleState import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.api.models.DataOperationRequest @@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean -import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration - +import org.springframework.util.LinkedMultiValueMap import java.time.Duration -import java.util.concurrent.TimeoutException - -import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent -@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper]) +@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext]) class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { def static clientTopic = 'my-topic-name' @@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { @SpringBean EventsPublisher eventPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - @Autowired - ObjectMapper objectMapper - def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') @@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson } - def 'Publish error response for entire data operations request when async task fails'() { - given: 'consumer subscribing to client topic' - def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer)) - cloudEventKafkaConsumer.subscribe([clientTopic]) - and: 'data operation request having non-ready and non-existing cm handle ids' - def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') - def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class) - when: 'an error occurs for the entire data operations request' - ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown) - and: 'subscribed client specified topic is polled and first record is selected' - def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last() - def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class) - then: 'data operation response event response size is 3' - dataOperationResponseEvent.data.responses.size() == 3 - and: 'all 3 have the expected error code' - dataOperationResponseEvent.data.responses.each { - assert it.statusCode == errorReportedToClientTopic.code - } - where: - scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic - 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING - 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR - } - static def getYangModelCmHandles() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() - def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() + def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState), @@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec { static def getYangModelCmHandlesForOneCmHandle() { def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')] - def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build() + def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build() return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)] } + + def mockAndPopulateErrorMap(errorReportedToClientTopic) { + def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read')) + .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational') + .options('some-option').resourceIdentifier('some-resource-identifier').build() + def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>() + cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id'])) + return cmHandleIdsPerResponseCodesPerOperation + } } -- cgit 1.2.3-korg