diff options
author | sourabh_sourabh <sourabh.sourabh@est.tech> | 2024-06-07 19:10:45 +0100 |
---|---|---|
committer | sourabh_sourabh <sourabh.sourabh@est.tech> | 2024-06-13 10:34:42 +0100 |
commit | 04dbe3800a0a9f9809cff2da59a31904a26f17ce (patch) | |
tree | eae0353400a41c1ddd603c1ce5799f739c4d5233 /cps-ncmp-service/src/main/java/org | |
parent | d3e64201a957ca4a1538ea0962c3e5218a5d34e8 (diff) |
#1: Used async version of web client for batch read operation
- Exposed async version of post method into dmi rest client.
- Code change is done to use async web client for batch data
operation.
- Use of CpsNcmpTaskExecutor code is removed.
Issue-ID: CPS-2174
Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
5 files changed, 74 insertions, 87 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index 6aa09767be..17b3d7ab1e 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final DataOperationRequest dataOperationRequest, final String requestId, final String authorization) { - dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, - authorization); + dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization); } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java index 7878c5d0ba..5811cf97da 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java @@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; @Component @RequiredArgsConstructor @@ -85,23 +86,41 @@ public class DmiRestClient { final String requestBodyAsJsonString, final OperationType operationType, final String authorization) { - final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA) - ? dataServicesWebClient : modelServicesWebClient; try { - return webClient.post() - .uri(toUri(dmiUrl)) - .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) - .body(BodyInserters.fromValue(requestBodyAsJsonString)) - .retrieve() - .toEntity(Object.class) - .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName())) - .block(); + return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType, + authorization).block(); } catch (final HttpServerErrorException e) { throw handleDmiClientException(e, operationType.getOperationName()); } } /** + * Asynchronously performs an HTTP POST operation with the given JSON data. + * + * @param requiredDmiService The service object required for retrieving or configuring the WebClient. + * @param dmiUrl The URL to which the POST request is sent. + * @param requestBodyAsJsonString The JSON string that will be sent as the request body. + * @param operationType An enumeration or object that holds information about the type of operation + * being performed. + * @param authorization The authorization token to be added to the request headers. + * @return A Mono emitting the response entity containing the server's response. + */ + public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService, + final String dmiUrl, + final String requestBodyAsJsonString, + final OperationType operationType, + final String authorization) { + final WebClient webClient = getWebClient(requiredDmiService); + return webClient.post() + .uri(toUri(dmiUrl)) + .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) + .body(BodyInserters.fromValue(requestBodyAsJsonString)) + .retrieve() + .toEntity(Object.class) + .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName())); + } + + /** * Get DMI plugin health status. * * @param dmiUrl the base URL of the dmi-plugin @@ -123,6 +142,10 @@ public class DmiRestClient { } } + private WebClient getWebClient(final RequiredDmiService requiredDmiService) { + return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient; + } + private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) { if (dmiProperties.isDmiBasicAuthEnabled()) { httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java index 2c0b702627..08885a9e04 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java @@ -106,13 +106,11 @@ public class DmiWebClientConfiguration { final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName, maximumConnectionsTotal); - final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider) + return HttpClient.create(dmiWebClientConnectionProvider) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000) .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds, TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds, TimeUnit.SECONDS))); - httpClient.warmup().block(); - return httpClient; } private static WebClient buildAndGetWebClient(final HttpClient httpClient, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java index 978855569a..786160a964 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.impl.client.DmiRestClient; import org.onap.cps.ncmp.api.impl.config.DmiProperties; @@ -51,13 +50,14 @@ import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Operations class for DMI data. */ @RequiredArgsConstructor @Service -@Slf4j public class DmiDataOperations { private final InventoryPersistence inventoryPersistence; @@ -231,28 +231,46 @@ public class DmiDataOperations { groupsOutPerDmiServiceName, final String authorization) { - groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> { - final String dmiUrl = DmiServiceUrlBuilder.newInstance() + Flux.fromIterable(groupsOutPerDmiServiceName.entrySet()) + .flatMap(dmiDataOperationsByDmiServiceName -> { + final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey(); + final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery); + final List<DmiDataOperation> dmiDataOperationRequestBodies + = dmiDataOperationsByDmiServiceName.getValue(); + return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); + }) + .subscribe(); + } + + private String buildDmiServiceUrl(final String dmiServiceName, final String requestId, + final String topicParamInQuery) { + return DmiServiceUrlBuilder.newInstance() .pathSegment("data") .queryParameter("requestId", requestId) .queryParameter("topic", topicParamInQuery) .build(dmiServiceName, dmiProperties.getDmiBasePath()); - sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization); - }); } - private void sendDataOperationRequestToDmiService(final String dmiUrl, - final List<DmiDataOperation> dmiDataOperationRequestBodies, - final String authorization) { + private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl, + final List<DmiDataOperation> dmiDataOperationRequestBodies, + final String authorization) { + final String dmiDataOperationRequestAsJsonString + = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies); + return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, + READ, authorization) + .then() + .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> { + handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies); + return Mono.empty(); + }); + } + + private String createDmiDataOperationRequestAsJsonString( + final List<DmiDataOperation> dmiDataOperationRequestBodies) { final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder() - .operations(dmiDataOperationRequestBodies).build(); - final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest); - try { - dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ, - authorization); - } catch (final DmiClientRequestException e) { - handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies); - } + .operations(dmiDataOperationRequestBodies) + .build(); + return jsonObjectMapper.asJsonString(dmiDataOperationRequest); } private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException, @@ -275,4 +293,4 @@ public class DmiDataOperations { ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId, cmHandleIdsPerResponseCodesPerOperation); } -} +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java index dc4108cac0..407fcf034e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -@Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j public class ResourceDataOperationRequestUtils { private static final String UNKNOWN_SERVICE_NAME = null; @@ -126,58 +125,6 @@ public class ResourceDataOperationRequestUtils { } /** - * Handles the async task completion for an entire data, publishing errors to client topic on task failure. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequest incoming data operation request details - * @param throwable error cause, or null if task completed with no exception - */ - public static void handleAsyncTaskCompletionForDataOperationsRequest( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequest, - final Throwable throwable) { - if (throwable == null) { - log.info("Data operations request {} completed.", requestId); - } else if (throwable instanceof TimeoutException) { - log.error("Data operations request {} timed out.", requestId); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING); - } else { - log.error("Data operations request {} failed.", requestId, throwable); - ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery, - requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR); - } - } - - /** - * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic. - * - * @param topicParamInQuery client given topic - * @param requestId unique identifier per request - * @param dataOperationRequestIn incoming data operation request details - * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations - */ - private static void publishErrorMessageToClientTopicForEntireOperation( - final String topicParamInQuery, - final String requestId, - final DataOperationRequest dataOperationRequestIn, - final NcmpResponseStatus ncmpResponseStatus) { - - final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>> - cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>(); - - for (final DataOperationDefinition dataOperationDefinitionIn : - dataOperationRequestIn.getDataOperationDefinitions()) { - cmHandleIdsPerResponseCodesPerOperation.add( - DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn), - Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds())); - } - publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation); - } - - /** * Creates data operation cloud event and publish it to client topic. * * @param clientTopic client given topic @@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class); + log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", + clientTopic, requestId, dataOperationCloudEvent.getId()); eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent); } } |