diff options
16 files changed, 162 insertions, 344 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index 45c7c33fd2..5b54ac243e 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -423,7 +423,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { } return ncmpPassthroughResourceRequestHandler; } - - } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java index e6d6faf983..80e1c442e9 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java @@ -20,14 +20,15 @@ package org.onap.cps.ncmp.rest.controller.handlers; -import java.util.function.Supplier; +import java.util.Collection; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.NetworkCmProxyQueryService; import org.onap.cps.ncmp.api.models.CmResourceAddress; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.spi.FetchDescendantsOption; +import org.onap.cps.spi.model.DataNode; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; @Component public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandler { @@ -38,14 +39,11 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle /** * Constructor. * - * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor * @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService * @param networkCmProxyQueryService @see org.onap.cps.ncmp.api.NetworkCmProxyQueryService */ - public NcmpCachedResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor, - final NetworkCmProxyDataService networkCmProxyDataService, + public NcmpCachedResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService, final NetworkCmProxyQueryService networkCmProxyQueryService) { - super(cpsNcmpTaskExecutor); this.networkCmProxyDataService = networkCmProxyDataService; this.networkCmProxyQueryService = networkCmProxyQueryService; } @@ -59,35 +57,30 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle * @param includeDescendants whether include descendants * @return the response entity */ - public ResponseEntity<Object> executeRequest(final String cmHandleId, - final String resourceIdentifier, + public ResponseEntity<Object> executeRequest(final String cmHandleId, final String resourceIdentifier, final boolean includeDescendants) { - - final Supplier<Object> taskSupplier = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier, - includeDescendants); - return executeTaskSync(taskSupplier); + final Collection<DataNode> dataNodes = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier, + includeDescendants); + return ResponseEntity.ok(dataNodes); } @Override - protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendants, - final String authorization) { - + protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendants, + final String authorization) { final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants); - - return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption); + return Mono.fromSupplier( + () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption)); } - private Supplier<Object> getTaskSupplierForQueryRequest(final String cmHandleId, - final String resourceIdentifier, - final boolean includeDescendants) { - + private Collection<DataNode> getTaskSupplierForQueryRequest(final String cmHandleId, + final String resourceIdentifier, + final boolean includeDescendants) { final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants); - - return () -> networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier, + return networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier, fetchDescendantsOption); } @@ -95,6 +88,4 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle return includeDescendants ? FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS : FetchDescendantsOption.OMIT_DESCENDANTS; } - - } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java index 1ae16820a1..20059e20f4 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java @@ -22,15 +22,14 @@ package org.onap.cps.ncmp.rest.controller.handlers; import java.util.Map; import java.util.UUID; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.models.CmResourceAddress; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.util.TopicValidator; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; @Slf4j @Service @@ -42,12 +41,9 @@ public abstract class NcmpDatastoreRequestHandler { @Value("${notification.async.executor.time-out-value-in-ms:60000}") protected int timeOutInMilliSeconds; - @Value("${notification.enabled:true}") protected boolean notificationFeatureEnabled; - protected final CpsNcmpTaskExecutor cpsNcmpTaskExecutor; - /** * Executes synchronous/asynchronous get request for given cm handle. * @@ -66,7 +62,7 @@ public abstract class NcmpDatastoreRequestHandler { final boolean asyncResponseRequested = topicParamInQuery != null; if (asyncResponseRequested && notificationFeatureEnabled) { - return executeAsyncTaskAndGetResponseEntity(cmResourceAddress, optionsParamInQuery, topicParamInQuery, + return fetchResourceDataAsynchronously(cmResourceAddress, optionsParamInQuery, topicParamInQuery, includeDescendants, authorization); } @@ -74,41 +70,36 @@ public abstract class NcmpDatastoreRequestHandler { log.warn("Asynchronous request is unavailable as notification feature is currently disabled, " + "will use synchronous operation."); } - final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, optionsParamInQuery, - NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization); - return executeTaskSync(taskSupplier); + final Mono<Object> resourceDataMono = getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, + NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization); + return fetchResourceDataSynchronously(resourceDataMono); } + private ResponseEntity<Object> fetchResourceDataSynchronously(final Mono<Object> resourceDataMono) { + return ResponseEntity.ok(resourceDataMono.block()); + } - private ResponseEntity<Object> executeTaskAsync(final String topicParamInQuery, - final String requestId, - final Supplier<Object> taskSupplier) { + private ResponseEntity<Object> fetchResourceDataAsynchronously(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final boolean includeDescendants, + final String authorization) { TopicValidator.validateTopicName(topicParamInQuery); + final String requestId = UUID.randomUUID().toString(); + getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, topicParamInQuery, requestId, + includeDescendants, authorization) + .doOnSuccess(result -> log.debug("Async operation succeeded for request id {}: {}", requestId, result)) + .doOnError(error -> + log.error("Async operation failed for request id {}: {}", requestId, error.getMessage())) + .subscribe(); log.debug("Received Async request with id {}", requestId); - cpsNcmpTaskExecutor.executeTask(taskSupplier, timeOutInMilliSeconds); return ResponseEntity.ok(Map.of("requestId", requestId)); } - protected ResponseEntity<Object> executeTaskSync(final Supplier<Object> taskSupplier) { - return ResponseEntity.ok(taskSupplier.get()); - } - - private ResponseEntity<Object> executeAsyncTaskAndGetResponseEntity(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final boolean includeDescendants, - final String authorization) { - final String requestId = UUID.randomUUID().toString(); - final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, - optionsParamInQuery, topicParamInQuery, requestId, includeDescendants, authorization); - return executeTaskAsync(topicParamInQuery, requestId, taskSupplier); - } - - protected abstract Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendant, - final String authorization); - + protected abstract Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendant, + final String authorization); } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java index d716877d54..53e374d30a 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java @@ -25,7 +25,6 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ; import java.util.Map; import java.util.UUID; -import java.util.function.Supplier; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException; import org.onap.cps.ncmp.api.impl.operations.DatastoreType; @@ -34,12 +33,12 @@ import org.onap.cps.ncmp.api.models.CmResourceAddress; import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException; import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.util.TopicValidator; import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; -@Component +@Service public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler { private final NetworkCmProxyDataService networkCmProxyDataService; @@ -49,12 +48,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH /** * Constructor. * - * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor * @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService */ - public NcmpPassthroughResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor, - final NetworkCmProxyDataService networkCmProxyDataService) { - super(cpsNcmpTaskExecutor); + public NcmpPassthroughResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService) { this.networkCmProxyDataService = networkCmProxyDataService; } @@ -79,15 +75,14 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH } @Override - protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendants, - final String authorization) { - - return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, - topicParamInQuery, requestId, authorization); + protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendants, + final String authorization) { + return networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, + topicParamInQuery, requestId, authorization); } private ResponseEntity<Object> getRequestIdAndSendDataOperationRequestToDmiService( diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java deleted file mode 100644 index 2601c7a5b3..0000000000 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.rest.executor; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -@Slf4j -@Service -public class CpsNcmpTaskExecutor { - - /** - * Execute a task asynchronously, and invoke completion handler when done. - * - * @param taskSupplier functional method is get() task needed to be executed asynchronously - * @param taskCompletionHandler the action to perform on task completion or error - * @param timeOutInMillis the time-out value in milliseconds - */ - public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier, - final BiConsumer<Object, Throwable> taskCompletionHandler, - final long timeOutInMillis) { - CompletableFuture.supplyAsync(taskSupplier) - .orTimeout(timeOutInMillis, MILLISECONDS) - .whenCompleteAsync(taskCompletionHandler); - } - - /** - * Execute a task asynchronously. - * - * @param taskSupplier functional method is get() task needed to be executed asynchronously - * @param timeOutInMillis the time-out value in milliseconds - */ - public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { - executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable), - timeOutInMillis); - } - - private void handleTaskCompletion(final Throwable throwable) { - if (throwable == null) { - log.info("Async task completed successfully."); - } else { - log.error("Async task failed. caused by : {}", throwable.toString()); - } - } -} - - - diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index c9dbc291c6..3a5748f002 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -59,7 +59,6 @@ import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle import org.onap.cps.ncmp.api.models.CmResourceAddress import org.onap.cps.ncmp.rest.controller.handlers.NcmpCachedResourceRequestHandler import org.onap.cps.ncmp.rest.controller.handlers.NcmpPassthroughResourceRequestHandler -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor import org.onap.cps.ncmp.rest.mapper.CmHandleStateMapper import org.onap.cps.ncmp.rest.mapper.DataOperationRequestMapper import org.onap.cps.ncmp.rest.model.DataOperationDefinition @@ -82,6 +81,9 @@ import spock.lang.Specification import java.time.OffsetDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter +import groovy.json.JsonSlurper +import org.springframework.http.ResponseEntity +import reactor.core.publisher.Mono @WebMvcTest(NetworkCmProxyController) class NetworkCmProxyControllerSpec extends Specification { @@ -114,16 +116,13 @@ class NetworkCmProxyControllerSpec extends Specification { Map<String, TrustLevel> trustLevelPerCmHandle = [:] @SpringBean - CpsNcmpTaskExecutor mockCpsTaskExecutor = Mock() - - @SpringBean DeprecationHelper stubbedDeprecationHelper = Stub() @SpringBean - NcmpCachedResourceRequestHandler ncmpCachedResourceRequestHandler = new NcmpCachedResourceRequestHandler(mockCpsTaskExecutor, mockNetworkCmProxyDataService, mockNetworkCmProxyQueryService) + NcmpCachedResourceRequestHandler ncmpCachedResourceRequestHandler = new NcmpCachedResourceRequestHandler(mockNetworkCmProxyDataService, mockNetworkCmProxyQueryService) @SpringBean - NcmpPassthroughResourceRequestHandler ncmpPassthroughResourceRequestHandler = new NcmpPassthroughResourceRequestHandler(mockCpsTaskExecutor, mockNetworkCmProxyDataService) + NcmpPassthroughResourceRequestHandler ncmpPassthroughResourceRequestHandler = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService) @Value('${rest.api.ncmp-base-path}/v1') def ncmpBasePathV1 @@ -160,7 +159,7 @@ class NetworkCmProxyControllerSpec extends Specification { when: 'get data resource request is performed' def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response then: 'the NCMP data service is called with correct parameters' - 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) + 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >> Mono.just(new ResponseEntity<Object>(HttpStatus.OK)) and: 'response status is Ok' assert response.status == HttpStatus.OK.value() } @@ -253,13 +252,15 @@ class NetworkCmProxyControllerSpec extends Specification { def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=$resourceIdentifier&options=(a=1,b=2)" and: 'ncmp service returns json object' def expectedCmResourceAddress = new CmResourceAddress(PASSTHROUGH_RUNNING.datastoreName, 'testCmHandle', resourceIdentifier) - mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress,'(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >> '{valid-json}' + 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(expectedCmResourceAddress, '(a=1,b=2)', NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) + >> Mono.just(new ResponseEntity<Object>('{valid-json}', HttpStatus.OK)) when: 'get data resource request is performed' def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response then: 'response status is Ok' - response.status == HttpStatus.OK.value() - and: 'response contains valid object body' - response.getContentAsString() == '{valid-json}' + assert response.status == 200 + and: 'response contains the object returned by the service' + def responseAsJsonObject = new JsonSlurper().parseText(response.getContentAsString()) + assert responseAsJsonObject.body == '{valid-json}' where: 'tokens are used in the resource identifier parameter' scenario | resourceIdentifier '/' | 'id/with/slashes' @@ -452,6 +453,8 @@ class NetworkCmProxyControllerSpec extends Specification { def 'Get resource data from DMI with valid topic i.e. async request for #scenario'() { given: 'resource data url' def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:${datastoreInUrl}?resourceIdentifier=parent/child&options=(a=1,b=2)&topic=my-topic-name" + and: 'the NCMP data service is called with correct parameters' + 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(_, '(a=1,b=2)', 'my-topic-name', _, NO_AUTH_HEADER) >> Mono.just(new ResponseEntity<Object>(HttpStatus.OK)) when: 'get data resource request is performed' def response = mvc.perform(get(getUrl).contentType(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON_VALUE)).andReturn().response then: 'async request id is generated' diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy index 8835c99fb1..00b0cb04c5 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy @@ -28,16 +28,16 @@ import org.onap.cps.ncmp.api.models.DataOperationRequest import org.onap.cps.ncmp.api.models.CmResourceAddress import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor +import org.springframework.http.HttpStatus +import reactor.core.publisher.Mono import spock.lang.Specification import spock.util.concurrent.PollingConditions class NcmpDatastoreRequestHandlerSpec extends Specification { - def spiedCpsNcmpTaskExecutor = Spy(CpsNcmpTaskExecutor) def mockNetworkCmProxyDataService = Mock(NetworkCmProxyDataService) - def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService) + def objectUnderTest = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService) def NO_AUTH_HEADER = null @@ -48,32 +48,26 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { def 'Attempt to execute async get request with #scenario.'() { given: 'notification feature is turned on/off' objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled - and: 'a flag to track the network service call' - def networkServiceMethodCalled = false and: 'a CM resource address' def cmResourceAddress = new CmResourceAddress('ds', 'ch1', 'resource1') - and: 'the (mocked) service will use the flag to indicate if it is called' - mockNetworkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, 'options', _, _, NO_AUTH_HEADER) >> - { networkServiceMethodCalled = true } + and: 'the (mocked) service is called with the correct parameters returns OK' + 1 * mockNetworkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, 'options', _, _, NO_AUTH_HEADER) >> Mono.just(HttpStatus.OK) when: 'get request is executed with topic = #topic' - objectUnderTest.executeRequest(cmResourceAddress, 'options', topic, false, NO_AUTH_HEADER) - then: 'the task is executed in an async fashion or not' - expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_) - and: 'the service request is invoked' - new PollingConditions().within(1) { - assert networkServiceMethodCalled == true - } + def response= objectUnderTest.executeRequest(cmResourceAddress, 'options', topic, false, NO_AUTH_HEADER) + then: 'a successful response with/without request id is returned' + assert response.statusCode.value == 200 + assert response.body instanceof Map == expectedResponseBodyIsMap where: 'the following parameters are used' - scenario | notificationFeatureEnabled | topic || expectedCalls - 'feature on, valid topic' | true | 'valid' || 1 - 'feature on, no topic' | true | null || 0 - 'feature off, valid topic' | false | 'valid' || 0 - 'feature off, no topic' | false | null || 0 + scenario | notificationFeatureEnabled | topic || expectedCalls | expectedResponseBodyIsMap + 'feature on, valid topic' | true | 'valid' || 1 | true + 'feature on, no topic' | true | null || 0 | false + 'feature off, valid topic' | false | 'valid' || 0 | false + 'feature off, no topic' | false | null || 0 | false } def 'Attempt to execute async data operation request with feature #scenario.'() { given: 'a extended request handler that supports bulk requests' - def objectUnderTest = new NcmpPassthroughResourceRequestHandler(spiedCpsNcmpTaskExecutor, mockNetworkCmProxyDataService) + def objectUnderTest = new NcmpPassthroughResourceRequestHandler(mockNetworkCmProxyDataService) and: 'notification feature is turned on/off' objectUnderTest.notificationFeatureEnabled = notificationFeatureEnabled when: 'data operation request is executed' diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy index ea472cd931..33eb48ffa2 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy @@ -43,7 +43,6 @@ import org.onap.cps.ncmp.api.impl.exception.ServerNcmpException import org.onap.cps.ncmp.rest.controller.NcmpRestInputMapper import org.onap.cps.ncmp.rest.controller.handlers.NcmpCachedResourceRequestHandler import org.onap.cps.ncmp.rest.controller.handlers.NcmpPassthroughResourceRequestHandler -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor import org.onap.cps.ncmp.rest.mapper.CmHandleStateMapper import org.onap.cps.ncmp.rest.mapper.DataOperationRequestMapper import org.onap.cps.ncmp.rest.util.DeprecationHelper @@ -83,9 +82,6 @@ class NetworkCmProxyRestExceptionHandlerSpec extends Specification { DataOperationRequestMapper dataOperationRequestMapper = Mappers.getMapper(DataOperationRequestMapper) @SpringBean - CpsNcmpTaskExecutor stubbedCpsTaskExecutor = Stub() - - @SpringBean DeprecationHelper stubbedDeprecationHelper = Stub() @SpringBean diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy deleted file mode 100644 index 4c8c40f4e6..0000000000 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy +++ /dev/null @@ -1,99 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.rest.executor - -import ch.qos.logback.classic.Level -import ch.qos.logback.classic.Logger -import ch.qos.logback.classic.spi.ILoggingEvent -import ch.qos.logback.core.read.ListAppender -import org.slf4j.LoggerFactory -import spock.lang.Specification -import spock.util.concurrent.PollingConditions - -class CpsNcmpTaskExecutorSpec extends Specification { - - def objectUnderTest = new CpsNcmpTaskExecutor() - def logger = Spy(ListAppender<ILoggingEvent>) - def enoughTime = 100 - def notEnoughTime = 10 - - void setup() { - ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger) - logger.start() - } - - void cleanup() { - ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).detachAndStopAllAppenders() - } - - def 'Execute successful task.'() { - when: 'task is executed' - objectUnderTest.executeTask(taskSupplier(), enoughTime) - then: 'an event is logged with level INFO' - new PollingConditions().within(1) { - def loggingEvent = getLoggingEvent() - assert loggingEvent.level == Level.INFO - } - and: 'the log indicates the task completed successfully' - assert loggingEvent.formattedMessage == 'Async task completed successfully.' - } - - def 'Execute failing task.'() { - when: 'task is executed' - objectUnderTest.executeTask(taskSupplierForFailingTask(), enoughTime) - then: 'an event is logged with level ERROR' - new PollingConditions().within(1) { - def loggingEvent = getLoggingEvent() - assert loggingEvent.level == Level.ERROR - } - and: 'the original error message is logged' - assert loggingEvent.formattedMessage.contains('original exception message') - } - - def 'Task times out.'() { - when: 'task is executed without enough time to complete' - objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime) - then: 'an event is logged with level ERROR' - new PollingConditions().within(1) { - def loggingEvent = getLoggingEvent() - assert loggingEvent.level == Level.ERROR - } - and: 'a timeout error message is logged' - assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException') - } - - def taskSupplier() { - return () -> 'hello world' - } - - def taskSupplierForFailingTask() { - return () -> { throw new RuntimeException('original exception message') } - } - - def taskSupplierForLongRunningTask() { - return () -> { sleep(enoughTime) } - } - - def getLoggingEvent() { - return logger.list[0] - } - -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java index 20545d711d..73c8d96096 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyDataService.java @@ -37,6 +37,7 @@ import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.model.ModuleDefinition; import org.onap.cps.spi.model.ModuleReference; +import reactor.core.publisher.Mono; /* * Datastore interface for handling CPS data. @@ -52,20 +53,29 @@ public interface NetworkCmProxyDataService { DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(DmiPluginRegistration dmiPluginRegistration); /** - * Get resource data for given data store using dmi. - * - * @param cmResourceAddress target datastore, cm handle and resource identifier - * @param optionsParamInQuery options query - * @param topicParamInQuery topic name for (triggering) async responses - * @param requestId unique requestId for async request - * @param authorization contents of Authorization header, or null if not present - * @return {@code Object} resource data - */ - Object getResourceDataForCmHandle(CmResourceAddress cmResourceAddress, - String optionsParamInQuery, - String topicParamInQuery, - String requestId, - String authorization); + * Fetches resource data for a given data store using DMI (Data Management Interface). + * This method retrieves data based on the provided CmResourceAddress and additional query parameters. + * It supports asynchronous processing and handles authorization if required. + * + * @param cmResourceAddress The target data store, including the CM handle and resource identifier. + * This parameter must not be null. + * @param optionsParamInQuery Additional query parameters that may influence the data retrieval process, + * such as filters or limits. This parameter can be null. + * @param topicParamInQuery The topic name for triggering asynchronous responses. If specified, + * the response will be sent to this topic. This parameter can be null. + * @param requestId A unique identifier for the request, used for tracking and correlating + * asynchronous operations. This parameter must not be null. + * @param authorization The contents of the Authorization header. This parameter can be null + * if authorization is not required. + * @return {@code Mono<Object>} A reactive Mono that emits the resource data on successful retrieval + * or an error signal if the operation fails. The Mono represents a single asynchronous + * computation result. + */ + Mono<Object> getResourceDataForCmHandle(CmResourceAddress cmResourceAddress, + String optionsParamInQuery, + String topicParamInQuery, + String requestId, + String authorization); /** * Get resource data for operational. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyQueryService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyQueryService.java index 340806b892..39d497217f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyQueryService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NetworkCmProxyQueryService.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,9 @@ package org.onap.cps.ncmp.api; +import java.util.Collection; import org.onap.cps.spi.FetchDescendantsOption; +import org.onap.cps.spi.model.DataNode; /* * Datastore interface for handling cached CPS data query requests. @@ -28,14 +30,21 @@ import org.onap.cps.spi.FetchDescendantsOption; public interface NetworkCmProxyQueryService { /** - * Get resource data for operational. + * Fetches operational resource data based on the provided CM handle identifier and CPS path. + * This method retrieves data nodes from the specified path within the context of a given CM handle. + * It supports options for fetching descendant nodes. * - * @param cmHandleId cm handle identifier - * @param cpsPath cps path - * @Link FetchDescendantsOption fetch descendants option - * @return {@code Object} resource data + * @param cmHandleId The CM handle identifier, which uniquely identifies the CM handle. + * This parameter must not be null. + * @param cpsPath The CPS (Control Plane Service) path specifying the location of the + * resource data within the CM handle. This parameter must not be null. + * @param fetchDescendantsOption The option specifying whether to fetch descendant nodes along with the specified + * resource data. + * @return {@code Collection<DataNode>} A collection of DataNode objects representing the resource data + * retrieved from the specified path. The collection may include descendant nodes based on the + * fetchDescendantsOption. */ - Object queryResourceDataOperational(String cmHandleId, - String cpsPath, - FetchDescendantsOption fetchDescendantsOption); + Collection<DataNode> queryResourceDataOperational(String cmHandleId, + String cpsPath, + FetchDescendantsOption fetchDescendantsOption); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index 17b3d7ab1e..754050947a 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -87,8 +87,8 @@ import org.onap.cps.spi.model.ModuleDefinition; import org.onap.cps.spi.model.ModuleReference; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; @Slf4j @Service @@ -133,17 +133,14 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService } @Override - public Object getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + public Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, final String optionsParamInQuery, final String topicParamInQuery, final String requestId, final String authorization) { - final ResponseEntity<?> responseEntity = dmiDataOperations.getResourceDataFromDmi(cmResourceAddress, - optionsParamInQuery, - topicParamInQuery, - requestId, - authorization); - return responseEntity.getBody(); + return dmiDataOperations.getResourceDataFromDmi(cmResourceAddress, optionsParamInQuery, topicParamInQuery, + requestId, authorization) + .flatMap(responseEntity -> Mono.justOrEmpty(responseEntity.getBody())); } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyQueryServiceImpl.java index d8353f3029..8d3b6ed8f7 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyQueryServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyQueryServiceImpl.java @@ -22,11 +22,13 @@ package org.onap.cps.ncmp.api.impl; import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME; +import java.util.Collection; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsQueryService; import org.onap.cps.ncmp.api.NetworkCmProxyQueryService; import org.onap.cps.spi.FetchDescendantsOption; +import org.onap.cps.spi.model.DataNode; import org.springframework.stereotype.Service; @Slf4j @@ -37,9 +39,9 @@ public class NetworkCmProxyQueryServiceImpl implements NetworkCmProxyQueryServic private final CpsQueryService cpsQueryService; @Override - public Object queryResourceDataOperational(final String cmHandleId, - final String cpsPath, - final FetchDescendantsOption fetchDescendantsOption) { + public Collection<DataNode> queryResourceDataOperational(final String cmHandleId, + final String cpsPath, + final FetchDescendantsOption fetchDescendantsOption) { return cpsQueryService.queryDataNodes(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, cmHandleId, cpsPath, fetchDescendantsOption); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java index 786160a964..3db84556e9 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java @@ -66,20 +66,20 @@ public class DmiDataOperations { private final DmiRestClient dmiRestClient; /** - * This method fetches the resource data from operational data store for given cm handle - * identifier on given resource using dmi client. + * This method fetches the resource data from the operational data store for a given CM handle + * identifier on the specified resource using the DMI client. * - * @param cmResourceAddress target datastore, cm handle and resource identifier - * @param optionsParamInQuery options query - * @param topicParamInQuery topic name for (triggering) async responses - * @param requestId requestId for async responses - * @param authorization contents of Authorization header, or null if not present - * @return {@code ResponseEntity} response entity + * @param cmResourceAddress Target datastore, CM handle, and resource identifier. + * @param optionsParamInQuery Options query string. + * @param topicParamInQuery Topic name for triggering asynchronous responses. + * @param requestId Request ID for asynchronous responses. + * @param authorization Contents of the Authorization header, or null if not present. + * @return {@code Mono<ResponseEntity<Object>>} A reactive type representing the response entity. */ @Timed(value = "cps.ncmp.dmi.get", description = "Time taken to fetch the resource data from operational data store for given cm handle " + "identifier on given resource using dmi client") - public ResponseEntity<Object> getResourceDataFromDmi(final CmResourceAddress cmResourceAddress, + public Mono<ResponseEntity<Object>> getResourceDataFromDmi(final CmResourceAddress cmResourceAddress, final String optionsParamInQuery, final String topicParamInQuery, final String requestId, @@ -90,7 +90,7 @@ public class DmiDataOperations { final String jsonRequestBody = getDmiRequestBody(READ, requestId, null, null, yangModelCmHandle); final String dmiUrl = getDmiResourceDataUrl(cmResourceAddress.datastoreName(), yangModelCmHandle, cmResourceAddress.resourceIdentifier(), optionsParamInQuery, topicParamInQuery); - return dmiRestClient.postOperationWithJsonData(DATA, dmiUrl, jsonRequestBody, READ, authorization); + return dmiRestClient.postOperationWithJsonDataAsync(DATA, dmiUrl, jsonRequestBody, READ, authorization); } /** diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy index 4d0af6f490..d91c79d33d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy @@ -23,6 +23,8 @@ package org.onap.cps.ncmp.api.impl +import reactor.core.publisher.Mono + import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR @@ -120,16 +122,16 @@ class NetworkCmProxyDataServiceImplSpec extends Specification { >> { new ResponseEntity<>(HttpStatus.CREATED) } } - def 'Get resource data for from DMI.'() { + def 'Get resource data from DMI.'() { given: 'cpsDataService returns valid data node' mockDataNode() and: 'some cm resource address' - def cmResourceAddress = new CmResourceAddress('some datastore','some CM Handle', 'some resource Id') + def cmResourceAddress = new CmResourceAddress('some datastore', 'some CM Handle', 'some resource Id') and: 'get resource data from DMI is called' mockDmiDataOperations.getResourceDataFromDmi(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) >> - new ResponseEntity<>('dmi-response', HttpStatus.OK) + Mono.just(new ResponseEntity<>('dmi-response', HttpStatus.OK)) when: 'get resource data operational for the given cm resource address is called' - def response = objectUnderTest.getResourceDataForCmHandle(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) + def response = objectUnderTest.getResourceDataForCmHandle(cmResourceAddress, OPTIONS_PARAM, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER).block() then: 'DMI returns a json response' assert response == 'dmi-response' } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy index a861809c64..b286e9fb10 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy @@ -21,8 +21,6 @@ package org.onap.cps.ncmp.api.impl.operations -import reactor.core.publisher.Mono - import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING @@ -50,6 +48,7 @@ import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity import org.springframework.test.context.ContextConfiguration import spock.lang.Shared +import reactor.core.publisher.Mono @SpringBootTest @ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, DmiProperties, DmiDataOperations]) @@ -76,15 +75,16 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { given: 'a cm handle for #cmHandleId' mockYangModelCmHandleRetrieval(dmiProperties) and: 'a positive response from DMI service when it is called with the expected parameters' - def responseFromDmi = new ResponseEntity<Object>(HttpStatus.OK) + def responseFromDmi = Mono.just(new ResponseEntity<Object>('{some-key:some-value}', HttpStatus.OK)) def expectedUrl = "${dmiServiceBaseUrl}${expectedDatastoreInUrl}?resourceIdentifier=${resourceIdentifier}${expectedOptionsInUrl}" - def expectedJson = '{"operation":"read","cmHandleProperties":' + expectedProperties + ',"moduleSetTag":""}' - mockDmiRestClient.postOperationWithJsonData(DATA, expectedUrl, expectedJson, READ, NO_AUTH_HEADER) >> responseFromDmi + def expectedJson = '{"operation":"read","cmHandleProperties":' + expectedProperties + ',"moduleSetTag":""}' + mockDmiRestClient.postOperationWithJsonDataAsync(DATA, expectedUrl, expectedJson, READ, NO_AUTH_HEADER) >> responseFromDmi when: 'get resource data is invoked' def cmResourceAddress = new CmResourceAddress(dataStore.datastoreName, cmHandleId, resourceIdentifier) - def result = objectUnderTest.getResourceDataFromDmi(cmResourceAddress, options, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER) + def result = objectUnderTest.getResourceDataFromDmi(cmResourceAddress, options, NO_TOPIC, NO_REQUEST_ID, NO_AUTH_HEADER).block() then: 'the result is the response from the DMI service' - assert result == responseFromDmi + assert result.body == '{some-key:some-value}' + assert result.statusCode.'2xxSuccessful' where: 'the following parameters are used' scenario | dmiProperties | dataStore | options || expectedProperties | expectedDatastoreInUrl | expectedOptionsInUrl 'without properties' | [] | PASSTHROUGH_OPERATIONAL | OPTIONS_PARAM || '{}' | 'passthrough-operational' | '&options=(a%3D1,b%3D2)' |