diff options
Diffstat (limited to 'cps-ncmp-rest/src/main')
5 files changed, 58 insertions, 154 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index 45c7c33fd2..5b54ac243e 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -423,7 +423,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { } return ncmpPassthroughResourceRequestHandler; } - - } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java index e6d6faf983..80e1c442e9 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java @@ -20,14 +20,15 @@ package org.onap.cps.ncmp.rest.controller.handlers; -import java.util.function.Supplier; +import java.util.Collection; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.NetworkCmProxyQueryService; import org.onap.cps.ncmp.api.models.CmResourceAddress; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.spi.FetchDescendantsOption; +import org.onap.cps.spi.model.DataNode; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; @Component public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandler { @@ -38,14 +39,11 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle /** * Constructor. * - * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor * @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService * @param networkCmProxyQueryService @see org.onap.cps.ncmp.api.NetworkCmProxyQueryService */ - public NcmpCachedResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor, - final NetworkCmProxyDataService networkCmProxyDataService, + public NcmpCachedResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService, final NetworkCmProxyQueryService networkCmProxyQueryService) { - super(cpsNcmpTaskExecutor); this.networkCmProxyDataService = networkCmProxyDataService; this.networkCmProxyQueryService = networkCmProxyQueryService; } @@ -59,35 +57,30 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle * @param includeDescendants whether include descendants * @return the response entity */ - public ResponseEntity<Object> executeRequest(final String cmHandleId, - final String resourceIdentifier, + public ResponseEntity<Object> executeRequest(final String cmHandleId, final String resourceIdentifier, final boolean includeDescendants) { - - final Supplier<Object> taskSupplier = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier, - includeDescendants); - return executeTaskSync(taskSupplier); + final Collection<DataNode> dataNodes = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier, + includeDescendants); + return ResponseEntity.ok(dataNodes); } @Override - protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendants, - final String authorization) { - + protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendants, + final String authorization) { final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants); - - return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption); + return Mono.fromSupplier( + () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption)); } - private Supplier<Object> getTaskSupplierForQueryRequest(final String cmHandleId, - final String resourceIdentifier, - final boolean includeDescendants) { - + private Collection<DataNode> getTaskSupplierForQueryRequest(final String cmHandleId, + final String resourceIdentifier, + final boolean includeDescendants) { final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants); - - return () -> networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier, + return networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier, fetchDescendantsOption); } @@ -95,6 +88,4 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle return includeDescendants ? FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS : FetchDescendantsOption.OMIT_DESCENDANTS; } - - } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java index 1ae16820a1..20059e20f4 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java @@ -22,15 +22,14 @@ package org.onap.cps.ncmp.rest.controller.handlers; import java.util.Map; import java.util.UUID; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.models.CmResourceAddress; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.util.TopicValidator; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; @Slf4j @Service @@ -42,12 +41,9 @@ public abstract class NcmpDatastoreRequestHandler { @Value("${notification.async.executor.time-out-value-in-ms:60000}") protected int timeOutInMilliSeconds; - @Value("${notification.enabled:true}") protected boolean notificationFeatureEnabled; - protected final CpsNcmpTaskExecutor cpsNcmpTaskExecutor; - /** * Executes synchronous/asynchronous get request for given cm handle. * @@ -66,7 +62,7 @@ public abstract class NcmpDatastoreRequestHandler { final boolean asyncResponseRequested = topicParamInQuery != null; if (asyncResponseRequested && notificationFeatureEnabled) { - return executeAsyncTaskAndGetResponseEntity(cmResourceAddress, optionsParamInQuery, topicParamInQuery, + return fetchResourceDataAsynchronously(cmResourceAddress, optionsParamInQuery, topicParamInQuery, includeDescendants, authorization); } @@ -74,41 +70,36 @@ public abstract class NcmpDatastoreRequestHandler { log.warn("Asynchronous request is unavailable as notification feature is currently disabled, " + "will use synchronous operation."); } - final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, optionsParamInQuery, - NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization); - return executeTaskSync(taskSupplier); + final Mono<Object> resourceDataMono = getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, + NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization); + return fetchResourceDataSynchronously(resourceDataMono); } + private ResponseEntity<Object> fetchResourceDataSynchronously(final Mono<Object> resourceDataMono) { + return ResponseEntity.ok(resourceDataMono.block()); + } - private ResponseEntity<Object> executeTaskAsync(final String topicParamInQuery, - final String requestId, - final Supplier<Object> taskSupplier) { + private ResponseEntity<Object> fetchResourceDataAsynchronously(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final boolean includeDescendants, + final String authorization) { TopicValidator.validateTopicName(topicParamInQuery); + final String requestId = UUID.randomUUID().toString(); + getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, topicParamInQuery, requestId, + includeDescendants, authorization) + .doOnSuccess(result -> log.debug("Async operation succeeded for request id {}: {}", requestId, result)) + .doOnError(error -> + log.error("Async operation failed for request id {}: {}", requestId, error.getMessage())) + .subscribe(); log.debug("Received Async request with id {}", requestId); - cpsNcmpTaskExecutor.executeTask(taskSupplier, timeOutInMilliSeconds); return ResponseEntity.ok(Map.of("requestId", requestId)); } - protected ResponseEntity<Object> executeTaskSync(final Supplier<Object> taskSupplier) { - return ResponseEntity.ok(taskSupplier.get()); - } - - private ResponseEntity<Object> executeAsyncTaskAndGetResponseEntity(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final boolean includeDescendants, - final String authorization) { - final String requestId = UUID.randomUUID().toString(); - final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, - optionsParamInQuery, topicParamInQuery, requestId, includeDescendants, authorization); - return executeTaskAsync(topicParamInQuery, requestId, taskSupplier); - } - - protected abstract Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendant, - final String authorization); - + protected abstract Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendant, + final String authorization); } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java index d716877d54..53e374d30a 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java @@ -25,7 +25,6 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ; import java.util.Map; import java.util.UUID; -import java.util.function.Supplier; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException; import org.onap.cps.ncmp.api.impl.operations.DatastoreType; @@ -34,12 +33,12 @@ import org.onap.cps.ncmp.api.models.CmResourceAddress; import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException; import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException; -import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.util.TopicValidator; import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; -@Component +@Service public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler { private final NetworkCmProxyDataService networkCmProxyDataService; @@ -49,12 +48,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH /** * Constructor. * - * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor * @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService */ - public NcmpPassthroughResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor, - final NetworkCmProxyDataService networkCmProxyDataService) { - super(cpsNcmpTaskExecutor); + public NcmpPassthroughResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService) { this.networkCmProxyDataService = networkCmProxyDataService; } @@ -79,15 +75,14 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH } @Override - protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress, - final String optionsParamInQuery, - final String topicParamInQuery, - final String requestId, - final boolean includeDescendants, - final String authorization) { - - return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, - topicParamInQuery, requestId, authorization); + protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress, + final String optionsParamInQuery, + final String topicParamInQuery, + final String requestId, + final boolean includeDescendants, + final String authorization) { + return networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, + topicParamInQuery, requestId, authorization); } private ResponseEntity<Object> getRequestIdAndSendDataOperationRequestToDmiService( diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java deleted file mode 100644 index 2601c7a5b3..0000000000 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.rest.executor; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -@Slf4j -@Service -public class CpsNcmpTaskExecutor { - - /** - * Execute a task asynchronously, and invoke completion handler when done. - * - * @param taskSupplier functional method is get() task needed to be executed asynchronously - * @param taskCompletionHandler the action to perform on task completion or error - * @param timeOutInMillis the time-out value in milliseconds - */ - public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier, - final BiConsumer<Object, Throwable> taskCompletionHandler, - final long timeOutInMillis) { - CompletableFuture.supplyAsync(taskSupplier) - .orTimeout(timeOutInMillis, MILLISECONDS) - .whenCompleteAsync(taskCompletionHandler); - } - - /** - * Execute a task asynchronously. - * - * @param taskSupplier functional method is get() task needed to be executed asynchronously - * @param timeOutInMillis the time-out value in milliseconds - */ - public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { - executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable), - timeOutInMillis); - } - - private void handleTaskCompletion(final Throwable throwable) { - if (throwable == null) { - log.info("Async task completed successfully."); - } else { - log.error("Async task failed. caused by : {}", throwable.toString()); - } - } -} - - - |