summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src')
-rwxr-xr-xcps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java3
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java43
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java4
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java54
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java57
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy20
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy58
7 files changed, 105 insertions, 134 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
index 6aa09767be..17b3d7ab1e 100755
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
@@ -160,8 +160,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
final DataOperationRequest dataOperationRequest,
final String requestId,
final String authorization) {
- dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId,
- authorization);
+ dmiDataOperations.requestResourceDataFromDmi(topicParamInQuery, dataOperationRequest, requestId, authorization);
}
@Override
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
index 7878c5d0ba..5811cf97da 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/client/DmiRestClient.java
@@ -50,6 +50,7 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
@@ -85,23 +86,41 @@ public class DmiRestClient {
final String requestBodyAsJsonString,
final OperationType operationType,
final String authorization) {
- final WebClient webClient = requiredDmiService.equals(RequiredDmiService.DATA)
- ? dataServicesWebClient : modelServicesWebClient;
try {
- return webClient.post()
- .uri(toUri(dmiUrl))
- .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
- .body(BodyInserters.fromValue(requestBodyAsJsonString))
- .retrieve()
- .toEntity(Object.class)
- .onErrorMap(httpError -> handleDmiClientException(httpError, operationType.getOperationName()))
- .block();
+ return postOperationWithJsonDataAsync(requiredDmiService, dmiUrl, requestBodyAsJsonString, operationType,
+ authorization).block();
} catch (final HttpServerErrorException e) {
throw handleDmiClientException(e, operationType.getOperationName());
}
}
/**
+ * Asynchronously performs an HTTP POST operation with the given JSON data.
+ *
+ * @param requiredDmiService The service object required for retrieving or configuring the WebClient.
+ * @param dmiUrl The URL to which the POST request is sent.
+ * @param requestBodyAsJsonString The JSON string that will be sent as the request body.
+ * @param operationType An enumeration or object that holds information about the type of operation
+ * being performed.
+ * @param authorization The authorization token to be added to the request headers.
+ * @return A Mono emitting the response entity containing the server's response.
+ */
+ public Mono<ResponseEntity<Object>> postOperationWithJsonDataAsync(final RequiredDmiService requiredDmiService,
+ final String dmiUrl,
+ final String requestBodyAsJsonString,
+ final OperationType operationType,
+ final String authorization) {
+ final WebClient webClient = getWebClient(requiredDmiService);
+ return webClient.post()
+ .uri(toUri(dmiUrl))
+ .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
+ .body(BodyInserters.fromValue(requestBodyAsJsonString))
+ .retrieve()
+ .toEntity(Object.class)
+ .onErrorMap(throwable -> handleDmiClientException(throwable, operationType.getOperationName()));
+ }
+
+ /**
* Get DMI plugin health status.
*
* @param dmiUrl the base URL of the dmi-plugin
@@ -123,6 +142,10 @@ public class DmiRestClient {
}
}
+ private WebClient getWebClient(final RequiredDmiService requiredDmiService) {
+ return requiredDmiService.equals(RequiredDmiService.DATA) ? dataServicesWebClient : modelServicesWebClient;
+ }
+
private void configureHttpHeaders(final HttpHeaders httpHeaders, final String authorization) {
if (dmiProperties.isDmiBasicAuthEnabled()) {
httpHeaders.setBasicAuth(dmiProperties.getAuthUsername(), dmiProperties.getAuthPassword());
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
index 2c0b702627..08885a9e04 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/DmiWebClientConfiguration.java
@@ -106,13 +106,11 @@ public class DmiWebClientConfiguration {
final ConnectionProvider dmiWebClientConnectionProvider = ConnectionProvider.create(connectionProviderName,
maximumConnectionsTotal);
- final HttpClient httpClient = HttpClient.create(dmiWebClientConnectionProvider)
+ return HttpClient.create(dmiWebClientConnectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutInSeconds * 1000)
.doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler(readTimeoutInSeconds,
TimeUnit.SECONDS)).addHandlerLast(new WriteTimeoutHandler(writeTimeoutInSeconds,
TimeUnit.SECONDS)));
- httpClient.warmup().block();
- return httpClient;
}
private static WebClient buildAndGetWebClient(final HttpClient httpClient,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
index 978855569a..786160a964 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
import org.onap.cps.ncmp.api.impl.config.DmiProperties;
@@ -51,13 +50,14 @@ import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.util.UriComponentsBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Operations class for DMI data.
*/
@RequiredArgsConstructor
@Service
-@Slf4j
public class DmiDataOperations {
private final InventoryPersistence inventoryPersistence;
@@ -231,28 +231,46 @@ public class DmiDataOperations {
groupsOutPerDmiServiceName,
final String authorization) {
- groupsOutPerDmiServiceName.forEach((dmiServiceName, dmiDataOperationRequestBodies) -> {
- final String dmiUrl = DmiServiceUrlBuilder.newInstance()
+ Flux.fromIterable(groupsOutPerDmiServiceName.entrySet())
+ .flatMap(dmiDataOperationsByDmiServiceName -> {
+ final String dmiServiceName = dmiDataOperationsByDmiServiceName.getKey();
+ final String dmiUrl = buildDmiServiceUrl(dmiServiceName, requestId, topicParamInQuery);
+ final List<DmiDataOperation> dmiDataOperationRequestBodies
+ = dmiDataOperationsByDmiServiceName.getValue();
+ return sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
+ })
+ .subscribe();
+ }
+
+ private String buildDmiServiceUrl(final String dmiServiceName, final String requestId,
+ final String topicParamInQuery) {
+ return DmiServiceUrlBuilder.newInstance()
.pathSegment("data")
.queryParameter("requestId", requestId)
.queryParameter("topic", topicParamInQuery)
.build(dmiServiceName, dmiProperties.getDmiBasePath());
- sendDataOperationRequestToDmiService(dmiUrl, dmiDataOperationRequestBodies, authorization);
- });
}
- private void sendDataOperationRequestToDmiService(final String dmiUrl,
- final List<DmiDataOperation> dmiDataOperationRequestBodies,
- final String authorization) {
+ private Mono<Void> sendDataOperationRequestToDmiService(final String dmiUrl,
+ final List<DmiDataOperation> dmiDataOperationRequestBodies,
+ final String authorization) {
+ final String dmiDataOperationRequestAsJsonString
+ = createDmiDataOperationRequestAsJsonString(dmiDataOperationRequestBodies);
+ return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, dmiDataOperationRequestAsJsonString,
+ READ, authorization)
+ .then()
+ .onErrorResume(DmiClientRequestException.class, dmiClientRequestException -> {
+ handleTaskCompletionException(dmiClientRequestException, dmiUrl, dmiDataOperationRequestBodies);
+ return Mono.empty();
+ });
+ }
+
+ private String createDmiDataOperationRequestAsJsonString(
+ final List<DmiDataOperation> dmiDataOperationRequestBodies) {
final DmiDataOperationRequest dmiDataOperationRequest = DmiDataOperationRequest.builder()
- .operations(dmiDataOperationRequestBodies).build();
- final String dmiDataOperationRequestAsJsonString = jsonObjectMapper.asJsonString(dmiDataOperationRequest);
- try {
- dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, dmiDataOperationRequestAsJsonString, READ,
- authorization);
- } catch (final DmiClientRequestException e) {
- handleTaskCompletionException(e, dmiUrl, dmiDataOperationRequestBodies);
- }
+ .operations(dmiDataOperationRequestBodies)
+ .build();
+ return jsonObjectMapper.asJsonString(dmiDataOperationRequest);
}
private void handleTaskCompletionException(final DmiClientRequestException dmiClientRequestException,
@@ -275,4 +293,4 @@ public class DmiDataOperations {
ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
cmHandleIdsPerResponseCodesPerOperation);
}
-}
+} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index dc4108cac0..407fcf034e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -49,8 +48,8 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
public class ResourceDataOperationRequestUtils {
private static final String UNKNOWN_SERVICE_NAME = null;
@@ -126,58 +125,6 @@ public class ResourceDataOperationRequestUtils {
}
/**
- * Handles the async task completion for an entire data, publishing errors to client topic on task failure.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequest incoming data operation request details
- * @param throwable error cause, or null if task completed with no exception
- */
- public static void handleAsyncTaskCompletionForDataOperationsRequest(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequest,
- final Throwable throwable) {
- if (throwable == null) {
- log.info("Data operations request {} completed.", requestId);
- } else if (throwable instanceof TimeoutException) {
- log.error("Data operations request {} timed out.", requestId);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING);
- } else {
- log.error("Data operations request {} failed.", requestId, throwable);
- ResourceDataOperationRequestUtils.publishErrorMessageToClientTopicForEntireOperation(topicParamInQuery,
- requestId, dataOperationRequest, NcmpResponseStatus.UNKNOWN_ERROR);
- }
- }
-
- /**
- * Creates data operation cloud event for when the entire data operation fails and publishes it to client topic.
- *
- * @param topicParamInQuery client given topic
- * @param requestId unique identifier per request
- * @param dataOperationRequestIn incoming data operation request details
- * @param ncmpResponseStatus response code to be sent for all cm handle ids in all operations
- */
- private static void publishErrorMessageToClientTopicForEntireOperation(
- final String topicParamInQuery,
- final String requestId,
- final DataOperationRequest dataOperationRequestIn,
- final NcmpResponseStatus ncmpResponseStatus) {
-
- final MultiValueMap<DmiDataOperation, Map<NcmpResponseStatus, List<String>>>
- cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>();
-
- for (final DataOperationDefinition dataOperationDefinitionIn :
- dataOperationRequestIn.getDataOperationDefinitions()) {
- cmHandleIdsPerResponseCodesPerOperation.add(
- DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
- Map.of(ncmpResponseStatus, dataOperationDefinitionIn.getCmHandleIds()));
- }
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperation);
- }
-
- /**
* Creates data operation cloud event and publish it to client topic.
*
* @param clientTopic client given topic
@@ -193,6 +140,8 @@ public class ResourceDataOperationRequestUtils {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
+ log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+ clientTopic, requestId, dataOperationCloudEvent.getId());
eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
index ad3f85c84a..a861809c64 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
@@ -21,6 +21,8 @@
package org.onap.cps.ncmp.api.impl.operations
+import reactor.core.publisher.Mono
+
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
@@ -100,14 +102,14 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
and: 'a positive response from DMI service when it is called with valid request parameters'
- def responseFromDmi = new ResponseEntity<Object>(HttpStatus.ACCEPTED)
+ def responseFromDmi = Mono.just(new ResponseEntity<Object>(HttpStatus.ACCEPTED))
def expectedDmiBatchResourceDataUrl = "someServiceName/dmi/v1/data?requestId=requestId&topic=my-topic-name"
def expectedBatchRequestAsJson = '{"operations":[{"operation":"read","operationId":"operational-14","datastore":"ncmp-datastore:passthrough-operational","options":"some option","resourceIdentifier":"some resource identifier","cmHandles":[{"id":"some-cm-handle","moduleSetTag":"","cmHandleProperties":{"prop1":"val1"}}]}]}'
- mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
- when: 'get resource data for group of cm handles are invoked'
+ mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, _, READ, NO_AUTH_HEADER) >> responseFromDmi
+ when: 'get resource data for group of cm handles is invoked'
objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER)
- then: 'the post operation was called and ncmp generated dmi request body json args'
- 1 * mockDmiRestClient.postOperationWithJsonData(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
+ then: 'the post operation was called with the expected URL and JSON request body'
+ 1 * mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedDmiBatchResourceDataUrl, expectedBatchRequestAsJson, READ, NO_AUTH_HEADER)
}
def 'Execute (async) data operation from DMI service with Exception.'() {
@@ -116,12 +118,12 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
dataOperationRequest.dataOperationDefinitions[0].cmHandleIds = [cmHandleId]
- and: 'the published cloud will be captured'
+ and: 'the published cloud event will be captured'
def actualDataOperationCloudEvent = null
eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
- and: 'a positive response from DMI service when it is called with valid request parameters'
- mockDmiRestClient.postOperationWithJsonData(*_) >> { throw new DmiClientRequestException(123,'','', UNKNOWN_ERROR) }
- when: 'attempt tp get resource data for group of cm handles are invoked'
+ and: 'a DMI client request exception is thrown when DMI service is called'
+ mockDmiRestClient.postOperationWithJsonDataAsync(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
+ when: 'attempt to get resource data for group of cm handles is invoked'
objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'my-request-id', NO_AUTH_HEADER)
then: 'the event contains the expected error details'
def eventDataValue = extractDataValue(actualDataOperationCloudEvent)
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
index 9028b9e5ed..6530685927 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtilsSpec.groovy
@@ -20,16 +20,20 @@
package org.onap.cps.ncmp.api.impl.utils.data.operation
+import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.ADVISED
+import static org.onap.cps.ncmp.api.impl.inventory.CmHandleState.READY
+
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.cps.events.EventsPublisher
-import org.onap.cps.ncmp.api.NcmpResponseStatus
+import org.onap.cps.ncmp.api.impl.operations.DmiDataOperation
+import org.onap.cps.ncmp.api.impl.operations.OperationType
import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.impl.inventory.CmHandleState
import org.onap.cps.ncmp.api.impl.inventory.CompositeStateBuilder
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.api.models.DataOperationRequest
@@ -37,15 +41,11 @@ import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
-
+import org.springframework.util.LinkedMultiValueMap
import java.time.Duration
-import java.util.concurrent.TimeoutException
-
-import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, ObjectMapper])
+@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
def static clientTopic = 'my-topic-name'
@@ -57,9 +57,6 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
@SpringBean
EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
- @Autowired
- ObjectMapper objectMapper
-
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
@@ -135,34 +132,10 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
}
- def 'Publish error response for entire data operations request when async task fails'() {
- given: 'consumer subscribing to client topic'
- def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties(consumerGroupId, CloudEventDeserializer))
- cloudEventKafkaConsumer.subscribe([clientTopic])
- and: 'data operation request having non-ready and non-existing cm handle ids'
- def dataOperationRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
- def dataOperationRequest = jsonObjectMapper.convertJsonString(dataOperationRequestJsonData, DataOperationRequest.class)
- when: 'an error occurs for the entire data operations request'
- ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(clientTopic, 'request-id', dataOperationRequest, exceptionThrown)
- and: 'subscribed client specified topic is polled and first record is selected'
- def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).last()
- def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
- then: 'data operation response event response size is 3'
- dataOperationResponseEvent.data.responses.size() == 3
- and: 'all 3 have the expected error code'
- dataOperationResponseEvent.data.responses.each {
- assert it.statusCode == errorReportedToClientTopic.code
- }
- where:
- scenario | exceptionThrown | consumerGroupId || errorReportedToClientTopic
- 'task timed out' | new TimeoutException() | 'test-2' || NcmpResponseStatus.DMI_SERVICE_NOT_RESPONDING
- 'unspecified error' | new RuntimeException() | 'test-3' || NcmpResponseStatus.UNKNOWN_ERROR
- }
-
static def getYangModelCmHandles() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
- def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
- def advisedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.ADVISED).withLastUpdatedTimeNow().build()
+ def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
+ def advisedState = new CompositeStateBuilder().withCmHandleState(ADVISED).withLastUpdatedTimeNow().build()
return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
new YangModelCmHandle(id: 'ch2-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
new YangModelCmHandle(id: 'ch6-dmi1', dmiServiceName: 'dmi1', dmiProperties: dmiProperties, compositeState: readyState),
@@ -176,7 +149,16 @@ class ResourceDataOperationRequestUtilsSpec extends MessagingBaseSpec {
static def getYangModelCmHandlesForOneCmHandle() {
def dmiProperties = [new YangModelCmHandle.Property('prop', 'some DMI property')]
- def readyState = new CompositeStateBuilder().withCmHandleState(CmHandleState.READY).withLastUpdatedTimeNow().build()
+ def readyState = new CompositeStateBuilder().withCmHandleState(READY).withLastUpdatedTimeNow().build()
return [new YangModelCmHandle(id: 'ch1-dmi1', dmiServiceName: 'dmi1', moduleSetTag: 'module-set-tag1', dmiProperties: dmiProperties, compositeState: readyState)]
}
+
+ def mockAndPopulateErrorMap(errorReportedToClientTopic) {
+ def dmiDataOperation = DmiDataOperation.builder().operation(OperationType.fromOperationName('read'))
+ .operationId('some-op-id').datastore('ncmp-datastore:passthrough-operational')
+ .options('some-option').resourceIdentifier('some-resource-identifier').build()
+ def cmHandleIdsPerResponseCodesPerOperation = new LinkedMultiValueMap<>()
+ cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperation, Map.of(errorReportedToClientTopic, ['some-cm-handle-id']))
+ return cmHandleIdsPerResponseCodesPerOperation
+ }
}