summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-rest/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-rest/src/main')
-rwxr-xr-xcps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java2
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java49
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java61
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java29
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java71
5 files changed, 58 insertions, 154 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
index 45c7c33fd2..5b54ac243e 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
@@ -423,7 +423,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
}
return ncmpPassthroughResourceRequestHandler;
}
-
-
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java
index e6d6faf983..80e1c442e9 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpCachedResourceRequestHandler.java
@@ -20,14 +20,15 @@
package org.onap.cps.ncmp.rest.controller.handlers;
-import java.util.function.Supplier;
+import java.util.Collection;
import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
import org.onap.cps.ncmp.api.NetworkCmProxyQueryService;
import org.onap.cps.ncmp.api.models.CmResourceAddress;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
import org.onap.cps.spi.FetchDescendantsOption;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
@Component
public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandler {
@@ -38,14 +39,11 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
/**
* Constructor.
*
- * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
* @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService
* @param networkCmProxyQueryService @see org.onap.cps.ncmp.api.NetworkCmProxyQueryService
*/
- public NcmpCachedResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor,
- final NetworkCmProxyDataService networkCmProxyDataService,
+ public NcmpCachedResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService,
final NetworkCmProxyQueryService networkCmProxyQueryService) {
- super(cpsNcmpTaskExecutor);
this.networkCmProxyDataService = networkCmProxyDataService;
this.networkCmProxyQueryService = networkCmProxyQueryService;
}
@@ -59,35 +57,30 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
* @param includeDescendants whether include descendants
* @return the response entity
*/
- public ResponseEntity<Object> executeRequest(final String cmHandleId,
- final String resourceIdentifier,
+ public ResponseEntity<Object> executeRequest(final String cmHandleId, final String resourceIdentifier,
final boolean includeDescendants) {
-
- final Supplier<Object> taskSupplier = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier,
- includeDescendants);
- return executeTaskSync(taskSupplier);
+ final Collection<DataNode> dataNodes = getTaskSupplierForQueryRequest(cmHandleId, resourceIdentifier,
+ includeDescendants);
+ return ResponseEntity.ok(dataNodes);
}
@Override
- protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
- final String optionsParamInQuery,
- final String topicParamInQuery,
- final String requestId,
- final boolean includeDescendants,
- final String authorization) {
-
+ protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+ final String optionsParamInQuery,
+ final String topicParamInQuery,
+ final String requestId,
+ final boolean includeDescendants,
+ final String authorization) {
final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants);
-
- return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption);
+ return Mono.fromSupplier(
+ () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, fetchDescendantsOption));
}
- private Supplier<Object> getTaskSupplierForQueryRequest(final String cmHandleId,
- final String resourceIdentifier,
- final boolean includeDescendants) {
-
+ private Collection<DataNode> getTaskSupplierForQueryRequest(final String cmHandleId,
+ final String resourceIdentifier,
+ final boolean includeDescendants) {
final FetchDescendantsOption fetchDescendantsOption = getFetchDescendantsOption(includeDescendants);
-
- return () -> networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier,
+ return networkCmProxyQueryService.queryResourceDataOperational(cmHandleId, resourceIdentifier,
fetchDescendantsOption);
}
@@ -95,6 +88,4 @@ public class NcmpCachedResourceRequestHandler extends NcmpDatastoreRequestHandle
return includeDescendants ? FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS
: FetchDescendantsOption.OMIT_DESCENDANTS;
}
-
-
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java
index 1ae16820a1..20059e20f4 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandler.java
@@ -22,15 +22,14 @@ package org.onap.cps.ncmp.rest.controller.handlers;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.api.models.CmResourceAddress;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
import org.onap.cps.ncmp.rest.util.TopicValidator;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
@Slf4j
@Service
@@ -42,12 +41,9 @@ public abstract class NcmpDatastoreRequestHandler {
@Value("${notification.async.executor.time-out-value-in-ms:60000}")
protected int timeOutInMilliSeconds;
-
@Value("${notification.enabled:true}")
protected boolean notificationFeatureEnabled;
- protected final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
-
/**
* Executes synchronous/asynchronous get request for given cm handle.
*
@@ -66,7 +62,7 @@ public abstract class NcmpDatastoreRequestHandler {
final boolean asyncResponseRequested = topicParamInQuery != null;
if (asyncResponseRequested && notificationFeatureEnabled) {
- return executeAsyncTaskAndGetResponseEntity(cmResourceAddress, optionsParamInQuery, topicParamInQuery,
+ return fetchResourceDataAsynchronously(cmResourceAddress, optionsParamInQuery, topicParamInQuery,
includeDescendants, authorization);
}
@@ -74,41 +70,36 @@ public abstract class NcmpDatastoreRequestHandler {
log.warn("Asynchronous request is unavailable as notification feature is currently disabled, "
+ "will use synchronous operation.");
}
- final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress, optionsParamInQuery,
- NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization);
- return executeTaskSync(taskSupplier);
+ final Mono<Object> resourceDataMono = getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
+ NO_TOPIC, NO_REQUEST_ID, includeDescendants, authorization);
+ return fetchResourceDataSynchronously(resourceDataMono);
}
+ private ResponseEntity<Object> fetchResourceDataSynchronously(final Mono<Object> resourceDataMono) {
+ return ResponseEntity.ok(resourceDataMono.block());
+ }
- private ResponseEntity<Object> executeTaskAsync(final String topicParamInQuery,
- final String requestId,
- final Supplier<Object> taskSupplier) {
+ private ResponseEntity<Object> fetchResourceDataAsynchronously(final CmResourceAddress cmResourceAddress,
+ final String optionsParamInQuery,
+ final String topicParamInQuery,
+ final boolean includeDescendants,
+ final String authorization) {
TopicValidator.validateTopicName(topicParamInQuery);
+ final String requestId = UUID.randomUUID().toString();
+ getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery, topicParamInQuery, requestId,
+ includeDescendants, authorization)
+ .doOnSuccess(result -> log.debug("Async operation succeeded for request id {}: {}", requestId, result))
+ .doOnError(error ->
+ log.error("Async operation failed for request id {}: {}", requestId, error.getMessage()))
+ .subscribe();
log.debug("Received Async request with id {}", requestId);
- cpsNcmpTaskExecutor.executeTask(taskSupplier, timeOutInMilliSeconds);
return ResponseEntity.ok(Map.of("requestId", requestId));
}
- protected ResponseEntity<Object> executeTaskSync(final Supplier<Object> taskSupplier) {
- return ResponseEntity.ok(taskSupplier.get());
- }
-
- private ResponseEntity<Object> executeAsyncTaskAndGetResponseEntity(final CmResourceAddress cmResourceAddress,
- final String optionsParamInQuery,
- final String topicParamInQuery,
- final boolean includeDescendants,
- final String authorization) {
- final String requestId = UUID.randomUUID().toString();
- final Supplier<Object> taskSupplier = getTaskSupplierForGetRequest(cmResourceAddress,
- optionsParamInQuery, topicParamInQuery, requestId, includeDescendants, authorization);
- return executeTaskAsync(topicParamInQuery, requestId, taskSupplier);
- }
-
- protected abstract Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
- final String optionsParamInQuery,
- final String topicParamInQuery,
- final String requestId,
- final boolean includeDescendant,
- final String authorization);
-
+ protected abstract Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+ final String optionsParamInQuery,
+ final String topicParamInQuery,
+ final String requestId,
+ final boolean includeDescendant,
+ final String authorization);
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
index d716877d54..53e374d30a 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
@@ -25,7 +25,6 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Supplier;
import org.onap.cps.ncmp.api.NetworkCmProxyDataService;
import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
@@ -34,12 +33,12 @@ import org.onap.cps.ncmp.api.models.CmResourceAddress;
import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException;
-import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
import org.onap.cps.ncmp.rest.util.TopicValidator;
import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
-@Component
+@Service
public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestHandler {
private final NetworkCmProxyDataService networkCmProxyDataService;
@@ -49,12 +48,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
/**
* Constructor.
*
- * @param cpsNcmpTaskExecutor @see org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
* @param networkCmProxyDataService @see org.onap.cps.ncmp.api.NetworkCmProxyDataService
*/
- public NcmpPassthroughResourceRequestHandler(final CpsNcmpTaskExecutor cpsNcmpTaskExecutor,
- final NetworkCmProxyDataService networkCmProxyDataService) {
- super(cpsNcmpTaskExecutor);
+ public NcmpPassthroughResourceRequestHandler(final NetworkCmProxyDataService networkCmProxyDataService) {
this.networkCmProxyDataService = networkCmProxyDataService;
}
@@ -79,15 +75,14 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH
}
@Override
- protected Supplier<Object> getTaskSupplierForGetRequest(final CmResourceAddress cmResourceAddress,
- final String optionsParamInQuery,
- final String topicParamInQuery,
- final String requestId,
- final boolean includeDescendants,
- final String authorization) {
-
- return () -> networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
- topicParamInQuery, requestId, authorization);
+ protected Mono<Object> getResourceDataForCmHandle(final CmResourceAddress cmResourceAddress,
+ final String optionsParamInQuery,
+ final String topicParamInQuery,
+ final String requestId,
+ final boolean includeDescendants,
+ final String authorization) {
+ return networkCmProxyDataService.getResourceDataForCmHandle(cmResourceAddress, optionsParamInQuery,
+ topicParamInQuery, requestId, authorization);
}
private ResponseEntity<Object> getRequestIdAndSendDataOperationRequestToDmiService(
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
deleted file mode 100644
index 2601c7a5b3..0000000000
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.rest.executor;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class CpsNcmpTaskExecutor {
-
- /**
- * Execute a task asynchronously, and invoke completion handler when done.
- *
- * @param taskSupplier functional method is get() task needed to be executed asynchronously
- * @param taskCompletionHandler the action to perform on task completion or error
- * @param timeOutInMillis the time-out value in milliseconds
- */
- public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier,
- final BiConsumer<Object, Throwable> taskCompletionHandler,
- final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync(taskCompletionHandler);
- }
-
- /**
- * Execute a task asynchronously.
- *
- * @param taskSupplier functional method is get() task needed to be executed asynchronously
- * @param timeOutInMillis the time-out value in milliseconds
- */
- public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable),
- timeOutInMillis);
- }
-
- private void handleTaskCompletion(final Throwable throwable) {
- if (throwable == null) {
- log.info("Async task completed successfully.");
- } else {
- log.error("Async task failed. caused by : {}", throwable.toString());
- }
- }
-}
-
-
-