summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org
diff options
context:
space:
mode:
authorsourabh_sourabh <sourabh.sourabh@est.tech>2024-06-07 19:10:45 +0100
committersourabh_sourabh <sourabh.sourabh@est.tech>2024-06-13 10:34:42 +0100
commit04dbe3800a0a9f9809cff2da59a31904a26f17ce (patch)
treeeae0353400a41c1ddd603c1ce5799f739c4d5233 /cps-ncmp-service/src/main/java/org
parentd3e64201a957ca4a1538ea0962c3e5218a5d34e8 (diff)
#1: Used async version of web client for batch read operation
- Exposed async version of post method into dmi rest client. - Code change is done to use async web client for batch data operation. - Use of CpsNcmpTaskExecutor code is removed. Issue-ID: CPS-2174 Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72 Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
-rwxr-xr-xcps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java54
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java57
5 files changed, 74 insertions, 87 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
index 6aa09767be..17b3d7ab1e 100755
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
@@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
final DataOperationRequest dataOperationRequest,
final String requestId,
final String authorization) {
- dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId,
- authorization);
+ dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization);
}
@Override
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
index 7878c5d0ba..5811cf97da 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
@@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
@@ -85,23 +86,41 @@ public class DmiRestClient {
final String requestBodyAsJsonString,
final OperationType operationType,
final String authorization) {
- final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA)
- ? dataServicesWebClient : modelServicesWebClient;
try {
- return webClient.post()
- .uri(toUri(dmiUrl))
- .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
- .body(BodyInserters.fromValue(requestBodyAsJsonString))
- .retrieve()
- .toEntity(Object.class)
- .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName()))
- .block();
+ return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType,
+ authorization).block();
} catch (final HttpServerErrorException e) {
throw handleDmiClientException(e, operationType.getOperationName());
}
}
/**
+ * Asynchronously performs an HTTP POST operation with the given JSON data.
+ *
+ * @param requiredDmiService The service object required for retrieving or configuring the WebClient.
+ * @param dmiUrl The URL to which the POST request is sent.
+ * @param requestBodyAsJsonString The JSON string that will be sent as the request body.
+ * @param operationType An enumeration or object that holds information about the type of operation
+ * being performed.
+ * @param authorization The authorization token to be added to the request headers.
+ * @return A Mono emitting the response entity containing the server's response.
+ */
+ public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService,
+ final String dmiUrl,
+ final String requestBodyAsJsonString,
+ final OperationType operationType,
+ final String authorization) {
+ final WebClient webClient = getWebClient(requiredDmiService);
+ return webClient.post()
+ .uri(toUri(dmiUrl))
+ .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
+ .body(BodyInserters.fromValue(requestBodyAsJsonString))
+ .retrieve()
+ .toEntity(Object.class)
+ .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName()));
+ }
+
+ /**
* Get DMI plugin health status.
*
* @param dmiUrl the base URL of the dmi-plugin
@@ -123,6 +142,10 @@ public class DmiRestClient {
}
}
+ private WebClient getWebClient(final RequiredDmiService requiredDmiService) {
+ return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient;
+ }
+
private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) {
if (dmiProperties.isDmiBasicAuthEnabled()) {
httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword());
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
index 2c0b702627..08885a9e04 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
@@ -106,13 +106,11 @@ public class DmiWebClientConfiguration {
final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName,
maximumConnectionsTotal);
- final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider)
+ return HttpClient.create(dmiWebClientConnectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000)
.doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds,
TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds,
TimeUnit.SECONDS)));
- httpClient.warmup().block();
- return httpClient;
}
private static WebClient buildAndGetWebClient(final HttpClient httpClient,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
index 978855569a..786160a964 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
import org.onap.cps.ncmp.api.impl.config.DmiProperties;
@@ -51,13 +50,14 @@ import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Operations class for DMI data.
*/
@RequiredArgsConstructor
@Service
-@Slf4j
public class DmiDataOperations {
private final InventoryPersistence inventoryPersistence;
@@ -231,28 +231,46 @@ public class DmiDataOperations {
groupsOutPerDmiServiceName,
final String authorization) {
- groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
- final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+ Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+ .flatMap(dmiDataOperationsByDmiServiceName -> {
+ final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+ final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+ final List<DmiDataOperation> dmiDataOperationRequestBodies
+ = dmiDataOperationsByDmiServiceName.getValue();
+ return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+ })
+ .subscribe();
+ }
+
+ private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+ final String topicParamInQuery) {
+ return DmiServiceUrlBuilder.newInstance()
.pathSegment("data")
.queryParameter("requestId", requestId)
.queryParameter("topic", topicParamInQuery)
.build(dmiServiceName, dmiProperties.getDmiBasePath());
- sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
- });
}
- private void sendDataOperationRequestToDmiService(final String dmiUrl,
- final List<DmiDataOperation> dmiDataOperationRequestBodies,
- final String authorization) {
+ private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+ final List<DmiDataOperation> dmiDataOperationRequestBodies,
+ final String authorization) {
+ final String dmiDataOperationRequestAsJsonString
+ = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+ return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+ READ, authorization)
+ .then()
+ .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+ handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+ return Mono.empty();
+ });
+ }
+
+ private String createDmiDataOperationRequestAsJsonString(
+ final List<DmiDataOperation> dmiDataOperationRequestBodies) {
final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
- .operations(dmiDataOperationRequestBodies).build();
- final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
- try {
- dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
- authorization);
- } catch (final DmiClientRequestException e) {
- handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
- }
+ .operations(dmiDataOperationRequestBodies)
+ .build();
+ return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
}
private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
@@ -275,4 +293,4 @@ public class DmiDataOperations {
ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
cmHandleIdsPerResponseCodesPerOperation);
}
-}
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index dc4108cac0..407fcf034e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
public class ResourceDataOperationRequestUtils {
private static final String UNKNOWN_SERVICE_NAME = null;
@@ -126,58 +125,6 @@ public class ResourceDataOperationRequestUtils {
}
/**
- * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequest incoming data operation request details
- * @param throwable error cause, or null if task completed with no exception
- */
- public static void handleAsyncTaskCompletionForDataOperationsRequest(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequest,
- final Throwable throwable) {
- if (throwable == null) {
- log.info("Data operations request {} completed.", requestId);
- } else if (throwable instanceof TimeoutException) {
- log.error("Data operations request {} timed out.", requestId);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
- } else {
- log.error("Data operations request {} failed.", requestId, throwable);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
- }
- }
-
- /**
- * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequestIn incoming data operation request details
- * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations
- */
- private static void publishErrorMessageToClientTopicForEntireOperation(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequestIn,
- final NcmpResponseStatus ncmpResponseStatus) {
-
- final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
- cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
-
- for (final DataOperationDefinition dataOperationDefinitionIn :
- dataOperationRequestIn.getDataOperationDefinitions()) {
- cmHandleIdsPerResponseCodesPerOperation.add(
- DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
- Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
- }
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
- }
-
- /**
* Creates data operation cloud event and publish it to client topic.
*
* @param clientTopic client given topic
@@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+ clientTopic, requestId, dataOperationCloudEvent.getId());
eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
}
}