summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsourabh_sourabh <sourabh.sourabh@est.tech>2024-06-07 19:10:45 +0100
committersourabh_sourabh <sourabh.sourabh@est.tech>2024-06-13 10:34:42 +0100
commit04dbe3800a0a9f9809cff2da59a31904a26f17ce (patch)
treeeae0353400a41c1ddd603c1ce5799f739c4d5233
parentd3e64201a957ca4a1538ea0962c3e5218a5d34e8 (diff)
#1: Used async version of web client for batch read operation
- Exposed async version of post method into dmi rest client. - Code change is done to use async web client for batch data operation. - Use of CpsNcmpTaskExecutor code is removed. Issue-ID: CPS-2174 Change-Id: I7840fd8c6d9debe42e50c860f9cf39d64274df72 Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java43
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy39
-rw-r--r--cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy11
-rwxr-xr-xcps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java54
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java57
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy20
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy58
10 files changed, 136 insertions, 196 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
index be5b93c47..d716877d5 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
@@ -25,13 +25,11 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
import java.util.Map;
import java.util.UUID;
-import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
import org.onap.cps.ncmp.api.impl.operations.OperationType;
-import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
import org.onap.cps.ncmp.api.models.CmResourceAddress;
import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
@@ -45,11 +43,7 @@ import org.springframework.stereotype.Component;
public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler {
private final NetworkCmProxyDataService networkCmProxyDataService;
-
- private static final Object noReturn = null;
-
private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 200;
-
private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
/**
@@ -101,10 +95,8 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
final DataOperationRequest dataOperationRequest,
final String authorization) {
final String requestId = UUID.randomUUID().toString();
- cpsNcmpTaskExecutor.executeTaskWithErrorHandling(
- getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization),
- getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId),
- timeOutInMilliSeconds);
+ networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery, dataOperationRequest, requestId,
+ authorization);
return ResponseEntity.ok(Map.of("requestId", requestId));
}
@@ -114,41 +106,18 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
dataOperationRequest.getDataOperationDefinitions().forEach(dataOperationDetail -> {
if (OperationType.fromOperationName(dataOperationDetail.getOperation()) != READ) {
throw new OperationNotSupportedException(
- dataOperationDetail.getOperation() + " operation not yet supported");
+ dataOperationDetail.getOperation() + " operation not yet supported");
}
if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
throw new InvalidDatastoreException(dataOperationDetail.getDatastore()
- + " datastore is not supported");
+ + " datastore is not supported");
}
if (dataOperationDetail.getCmHandleIds().size() > MAXIMUM_CM_HANDLES_PER_OPERATION) {
final String errorMessage = String.format(PAYLOAD_TOO_LARGE_TEMPLATE,
- dataOperationDetail.getOperationId(),
- dataOperationDetail.getCmHandleIds().size());
+ dataOperationDetail.getOperationId(),
+ dataOperationDetail.getCmHandleIds().size());
throw new PayloadTooLargeException(errorMessage);
}
});
}
-
- private Supplier<Object> getTaskSupplierForDataOperationRequest(final String topicParamInQuery,
- final DataOperationRequest dataOperationRequest,
- final String requestId,
- final String authorization) {
- return () -> {
- networkCmProxyDataService.executeDataOperationForCmHandles(topicParamInQuery,
- dataOperationRequest,
- requestId,
- authorization);
- return noReturn;
- };
- }
-
- private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest(
- final String topicParamInQuery,
- final DataOperationRequest dataOperationRequest,
- final String requestId) {
- return (result, throwable) ->
- ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery,
- requestId, dataOperationRequest, throwable);
- }
-
}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
index 2d7e9b2d0..c9dbc291c 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
@@ -23,6 +23,23 @@
package org.onap.cps.ncmp.rest.controller
+import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores
+import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
+import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH
+import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
+import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
+import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put
+
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import ch.qos.logback.classic.spi.ILoggingEvent
@@ -62,28 +79,10 @@ import org.springframework.http.MediaType
import org.springframework.test.web.servlet.MockMvc
import spock.lang.Shared
import spock.lang.Specification
-
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
-import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.DataStores
-import static org.onap.cps.ncmp.api.impl.inventory.CompositeState.Operational
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.OPERATIONAL
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.DELETE
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.PATCH
-import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
-import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
-import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.patch
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put
-
@WebMvcTest(NetworkCmProxyController)
class NetworkCmProxyControllerSpec extends Specification {
@@ -197,8 +196,8 @@ class NetworkCmProxyControllerSpec extends Specification {
assert response.status == HttpStatus.OK.value()
and: 'async request id is generated'
assert response.contentAsString.contains('requestId')
- then: 'the request is handled asynchronously'
- 1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_)
+ then: 'the request for (async) data operation invoked once'
+ 1 * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('my-topic-name', _, _, null)
where: 'the following data stores are used'
datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL]
}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
index 641715d0d..8835c99fb 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
@@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
when: 'data operation request is executed'
objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER)
then: 'the task is executed in an async fashion or not'
- expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
+ expectedCalls * mockNetworkCmProxyDataService.executeDataOperationForCmHandles('someTopic', _, _, null)
where: 'the following parameters are used'
scenario | notificationFeatureEnabled || expectedCalls
'on' | true || 1
@@ -99,10 +99,11 @@ class NcmpDatastoreRequestHandlerSpec extends Specification {
networkServiceMethodCalled = true
}
when: 'data operation request is executed'
- objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
- then: 'the task is executed in an async fashion'
- 1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_)
- and: 'the network service is invoked'
+ def response = objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER)
+ and: 'a successful response with request id is returned'
+ assert response.statusCode.value == 200
+ assert response.body.requestId != null
+ then: 'the network service is invoked'
new PollingConditions().within(1) {
assert networkServiceMethodCalled == true
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
index 6aa09767b..17b3d7ab1 100755
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
@@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
final DataOperationRequest dataOperationRequest,
final String requestId,
final String authorization) {
- dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId,
- authorization);
+ dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization);
}
@Override
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
index 7878c5d0b..5811cf97d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
@@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
@@ -85,23 +86,41 @@ public class DmiRestClient {
final String requestBodyAsJsonString,
final OperationType operationType,
final String authorization) {
- final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA)
- ? dataServicesWebClient : modelServicesWebClient;
try {
- return webClient.post()
- .uri(toUri(dmiUrl))
- .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
- .body(BodyInserters.fromValue(requestBodyAsJsonString))
- .retrieve()
- .toEntity(Object.class)
- .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName()))
- .block();
+ return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType,
+ authorization).block();
} catch (final HttpServerErrorException e) {
throw handleDmiClientException(e, operationType.getOperationName());
}
}
/**
+ * Asynchronously performs an HTTP POST operation with the given JSON data.
+ *
+ * @param requiredDmiService The service object required for retrieving or configuring the WebClient.
+ * @param dmiUrl The URL to which the POST request is sent.
+ * @param requestBodyAsJsonString The JSON string that will be sent as the request body.
+ * @param operationType An enumeration or object that holds information about the type of operation
+ * being performed.
+ * @param authorization The authorization token to be added to the request headers.
+ * @return A Mono emitting the response entity containing the server's response.
+ */
+ public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService,
+ final String dmiUrl,
+ final String requestBodyAsJsonString,
+ final OperationType operationType,
+ final String authorization) {
+ final WebClient webClient = getWebClient(requiredDmiService);
+ return webClient.post()
+ .uri(toUri(dmiUrl))
+ .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
+ .body(BodyInserters.fromValue(requestBodyAsJsonString))
+ .retrieve()
+ .toEntity(Object.class)
+ .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName()));
+ }
+
+ /**
* Get DMI plugin health status.
*
* @param dmiUrl the base URL of the dmi-plugin
@@ -123,6 +142,10 @@ public class DmiRestClient {
}
}
+ private WebClient getWebClient(final RequiredDmiService requiredDmiService) {
+ return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient;
+ }
+
private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) {
if (dmiProperties.isDmiBasicAuthEnabled()) {
httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword());
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
index 2c0b70262..08885a9e0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
@@ -106,13 +106,11 @@ public class DmiWebClientConfiguration {
final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName,
maximumConnectionsTotal);
- final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider)
+ return HttpClient.create(dmiWebClientConnectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000)
.doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds,
TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds,
TimeUnit.SECONDS)));
- httpClient.warmup().block();
- return httpClient;
}
private static WebClient buildAndGetWebClient(final HttpClient httpClient,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
index 978855569..786160a96 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
import org.onap.cps.ncmp.api.impl.config.DmiProperties;
@@ -51,13 +50,14 @@ import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Operations class for DMI data.
*/
@RequiredArgsConstructor
@Service
-@Slf4j
public class DmiDataOperations {
private final InventoryPersistence inventoryPersistence;
@@ -231,28 +231,46 @@ public class DmiDataOperations {
groupsOutPerDmiServiceName,
final String authorization) {
- groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
- final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+ Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+ .flatMap(dmiDataOperationsByDmiServiceName -> {
+ final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+ final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+ final List<DmiDataOperation> dmiDataOperationRequestBodies
+ = dmiDataOperationsByDmiServiceName.getValue();
+ return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+ })
+ .subscribe();
+ }
+
+ private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+ final String topicParamInQuery) {
+ return DmiServiceUrlBuilder.newInstance()
.pathSegment("data")
.queryParameter("requestId", requestId)
.queryParameter("topic", topicParamInQuery)
.build(dmiServiceName, dmiProperties.getDmiBasePath());
- sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
- });
}
- private void sendDataOperationRequestToDmiService(final String dmiUrl,
- final List<DmiDataOperation> dmiDataOperationRequestBodies,
- final String authorization) {
+ private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+ final List<DmiDataOperation> dmiDataOperationRequestBodies,
+ final String authorization) {
+ final String dmiDataOperationRequestAsJsonString
+ = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+ return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+ READ, authorization)
+ .then()
+ .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+ handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+ return Mono.empty();
+ });
+ }
+
+ private String createDmiDataOperationRequestAsJsonString(
+ final List<DmiDataOperation> dmiDataOperationRequestBodies) {
final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
- .operations(dmiDataOperationRequestBodies).build();
- final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
- try {
- dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
- authorization);
- } catch (final DmiClientRequestException e) {
- handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
- }
+ .operations(dmiDataOperationRequestBodies)
+ .build();
+ return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
}
private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
@@ -275,4 +293,4 @@ public class DmiDataOperations {
ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
cmHandleIdsPerResponseCodesPerOperation);
}
-}
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index dc4108cac..407fcf034 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
public class ResourceDataOperationRequestUtils {
private static final String UNKNOWN_SERVICE_NAME = null;
@@ -126,58 +125,6 @@ public class ResourceDataOperationRequestUtils {
}
/**
- * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequest incoming data operation request details
- * @param throwable error cause, or null if task completed with no exception
- */
- public static void handleAsyncTaskCompletionForDataOperationsRequest(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequest,
- final Throwable throwable) {
- if (throwable == null) {
- log.info("Data operations request {} completed.", requestId);
- } else if (throwable instanceof TimeoutException) {
- log.error("Data operations request {} timed out.", requestId);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
- } else {
- log.error("Data operations request {} failed.", requestId, throwable);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
- }
- }
-
- /**
- * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequestIn incoming data operation request details
- * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations
- */
- private static void publishErrorMessageToClientTopicForEntireOperation(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequestIn,
- final NcmpResponseStatus ncmpResponseStatus) {
-
- final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
- cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
-
- for (final DataOperationDefinition dataOperationDefinitionIn :
- dataOperationRequestIn.getDataOperationDefinitions()) {
- cmHandleIdsPerResponseCodesPerOperation.add(
- DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
- Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
- }
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
- }
-
- /**
* Creates data operation cloud event and publish it to client topic.
*
* @param clientTopic client given topic
@@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+ clientTopic, requestId, dataOperationCloudEvent.getId());
eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
index ad3f85c84..a861809c6 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
@@ -21,6 +21,8 @@
package org.onap.cps.ncmp.api.impl.operations
+import reactor.core.publisher.Mono
+
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
@@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
and: 'a positive response from DMI service when it is called with valid request parameters'
- def responseFromDmi = new ResponseEntity<Object>(HttpStatus.ACCEPTED)
+ def responseFromDmi = Mono.just(new ResponseEntity<Object>(HttpStatus.ACCEPTED))
def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name"
def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}'
- mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
- when: 'get resource data for group of cm handles are invoked'
+ mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
+ when: 'get resource data for group of cm handles is invoked'
objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER)
- then: 'the post operation was called and ncmp generated dmi request body json args'
- 1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
+ then: 'the post operation was called with the expected URL and JSON request body'
+ 1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
}
def 'Execute (async) data operation from DMI service with Exception.'() {
@@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
- and: 'the published cloud will be captured'
+ and: 'the published cloud event will be captured'
def actualDataOperationCloudEvent = null
eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
- and: 'a positive response from DMI service when it is called with valid request parameters'
- mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) }
- when: 'attempt tp get resource data for group of cm handles are invoked'
+ and: 'a DMI client request exception is thrown when DMI service is called'
+ mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
+ when: 'attempt to get resource data for group of cm handles is invoked'
objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER)
then: 'the event contains the expected error details'
def eventDataValue = extractDataValue(actualDataOperationCloudEvent)
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
index 9028b9e5e..653068592 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
@@ -20,16 +20,20 @@
package org.onap.cps.ncmp.api.impl.utils.data.operation
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY
+
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.events.EventsPublisher
-import org.onap.cps.ncmp.api.NcmpResponseStatus
+import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation
+import org.onap.cps.ncmp.api.impl.operations.OperationType
import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.api.models.DataOperationRequest
@@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
-
+import org.springframework.util.LinkedMultiValueMap
import java.time.Duration
-import java.util.concurrent.TimeoutException
-
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
+@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
def static clientTopic = 'my-topic-name'
@@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
@SpringBean
EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
- @Autowired
- ObjectMapper objectMapper
-
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
}
- def 'Publish error response for entire data operations request when async task fails'() {
- given: 'consumer subscribing to client topic'
- def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
- cloudEventKafkaConsumer.subscribe([clientTopic])
- and: 'data operation request having non-ready and non-existing cm handle ids'
- def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
- def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
- when: 'an error occurs for the entire data operations request'
- ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
- and: 'subscribed client specified topic is polled and first record is selected'
- def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
- def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
- then: 'data operation response event response size is 3'
- dataOperationResponseEvent.data.responses.size() == 3
- and: 'all 3 have the expected error code'
- dataOperationResponseEvent.data.responses.each {
- assert it.statusCode == errorReportedToClientTopic.code
- }
- where:
- scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic
- 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
- 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR
- }
-
static def getYangModelCmHandles() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
- def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
- def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build()
+ def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
+ def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build()
return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
@@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
static def getYangModelCmHandlesForOneCmHandle() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
- def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
+ def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)]
}
+
+ def mockAndPopulateErrorMap(errorReportedToClientTopic) {
+ def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read'))
+ .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational')
+ .options('some-option').resourceIdentifier('some-resource-identifier').build()
+ def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>()
+ cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id']))
+ return cmHandleIdsPerResponseCodesPerOperation
+ }
}