summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-rest/src/main/java/org
diff options
context:
space:
mode:
authorJosephKeenan <joseph.keenan@est.tech>2022-05-24 18:59:25 +0100
committerJosephKeenan <joseph.keenan@est.tech>2022-05-25 10:47:34 +0100
commitf31c7f8bd4985c84f9126d071439c1a4de57f704 (patch)
tree3b5d91b6357705304ae95fe1ad01156afbded020 /cps-ncmp-rest/src/main/java/org
parent4cf4962b74765a5afe234aa258a9143ea6936f73 (diff)
Async request response NCMP -> Client
-Added consumer for DMI events and producer for forwarding to client -Added schemas for events -Updated tests -Added new module for ncmp events -Used mapstruct for event mapping Issue-ID: CPS-830 Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605 Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech> Signed-off-by: JosephKeenan <joseph.keenan@est.tech> Signed-off-by: ToineSiebelink <toine.siebelink@est.tech> Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Diffstat (limited to 'cps-ncmp-rest/src/main/java/org')
-rwxr-xr-xcps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java79
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java44
-rw-r--r--cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java60
3 files changed, 142 insertions, 41 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
index cedc94672c..11517bcc9e 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
@@ -46,6 +46,7 @@ import org.onap.cps.ncmp.api.impl.exception.InvalidTopicException;
import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters;
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
import org.onap.cps.ncmp.rest.api.NetworkCmProxyApi;
+import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper;
import org.onap.cps.ncmp.rest.model.CmHandleProperties;
import org.onap.cps.ncmp.rest.model.CmHandleProperty;
@@ -61,6 +62,7 @@ import org.onap.cps.ncmp.rest.model.RestOutputCmHandle;
import org.onap.cps.ncmp.rest.model.RestOutputCmHandlePublicProperties;
import org.onap.cps.utils.CpsValidator;
import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -75,12 +77,14 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
private static final String NO_BODY = null;
private static final String NO_REQUEST_ID = null;
private static final String NO_TOPIC = null;
- public static final String ASYNC_REQUEST_ID = "requestId";
-
private final NetworkCmProxyDataService networkCmProxyDataService;
private final JsonObjectMapper jsonObjectMapper;
private final NcmpRestInputMapper ncmpRestInputMapper;
private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper;
+ private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
+
+ @Value("${notification.async.executor.time-out-value-in-ms:2000}")
+ private int timeOutInMilliSeconds;
/**
* Get resource data from operational datastore.
@@ -96,19 +100,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
final @NotNull @Valid String resourceIdentifier,
final @Valid String optionsParamInQuery,
final @Valid String topicParamInQuery) {
- final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery);
- final Map<String, Object> asyncResponseData = asyncResponse.getBody();
+ if (isValidTopic(topicParamInQuery)) {
+ final String requestId = UUID.randomUUID().toString();
+ cpsNcmpTaskExecutor.executeTask(() ->
+ networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+ cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
+ requestId
+ ), timeOutInMilliSeconds
+ );
+ return acknowledgeAsyncRequest(requestId);
+ }
- final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- asyncResponseData == null ? NO_TOPIC : topicParamInQuery,
- asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString());
+ final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+ cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID);
- if (asyncResponseData == null) {
- return ResponseEntity.ok(responseObject);
- }
- return ResponseEntity.ok(asyncResponse);
+ return ResponseEntity.ok(responseObject);
}
/**
@@ -125,19 +131,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
final @NotNull @Valid String resourceIdentifier,
final @Valid String optionsParamInQuery,
final @Valid String topicParamInQuery) {
- final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery);
- final Map<String, Object> asyncResponseData = asyncResponse.getBody();
+ if (isValidTopic(topicParamInQuery)) {
+ final String resourceDataRequestId = UUID.randomUUID().toString();
+ cpsNcmpTaskExecutor.executeTask(() ->
+ networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
+ cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
+ resourceDataRequestId
+ ), timeOutInMilliSeconds
+ );
+ return acknowledgeAsyncRequest(resourceDataRequestId);
+ }
- final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- asyncResponseData == null ? NO_TOPIC : topicParamInQuery,
- asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString());
+ final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
+ cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID);
- if (asyncResponseData == null) {
- return ResponseEntity.ok(responseObject);
- }
- return ResponseEntity.ok(asyncResponse);
+ return ResponseEntity.ok(responseObject);
}
@Override
@@ -319,18 +327,7 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
return restOutputCmHandle;
}
- private ResponseEntity<Map<String, Object>> populateAsyncResponse(final String topicParamInQuery) {
- final boolean processAsynchronously = hasTopicParameter(topicParamInQuery);
- final Map<String, Object> responseData;
- if (processAsynchronously) {
- responseData = getAsyncResponseData();
- } else {
- responseData = null;
- }
- return ResponseEntity.ok().body(responseData);
- }
-
- private static boolean hasTopicParameter(final String topicName) {
+ private static boolean isValidTopic(final String topicName) {
if (topicName == null) {
return false;
}
@@ -340,11 +337,11 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic");
}
- private Map<String, Object> getAsyncResponseData() {
- final Map<String, Object> asyncResponseData = new HashMap<>(1);
- final String resourceDataRequestId = UUID.randomUUID().toString();
- asyncResponseData.put(ASYNC_REQUEST_ID, resourceDataRequestId);
- return asyncResponseData;
+ private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) {
+ final Map<String, Object> acknowledgeData = new HashMap<>(1);
+ acknowledgeData.put("requestId", requestId);
+ return ResponseEntity.ok(acknowledgeData);
}
}
+
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java
new file mode 100644
index 0000000000..3e8929d2e3
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.exceptions;
+
+import lombok.Getter;
+
+public class CpsTaskExecutionException extends RuntimeException {
+
+ private static final long serialVersionUID = 1481520410918497454L;
+
+ @Getter
+ final String details;
+
+ /**
+ * Constructor.
+ *
+ * @param message the error message
+ * @param details the error details
+ * @param cause the cause of the exception
+ */
+ public CpsTaskExecutionException(final String message, final String details, final Throwable cause) {
+ super(message, cause);
+ this.details = details;
+ }
+
+} \ No newline at end of file
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
new file mode 100644
index 0000000000..93aa2858ca
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.executor;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CpsNcmpTaskExecutor {
+
+ /**
+ * Execute task asynchronously and publish response to supplied topic.
+ *
+ * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param timeOutInMillis the time out value in milliseconds
+ */
+ public void executeTask(final Supplier<Object> taskSupplier, final int timeOutInMillis) {
+ CompletableFuture.supplyAsync(taskSupplier::get)
+ .orTimeout(timeOutInMillis, MILLISECONDS)
+ .whenCompleteAsync(
+ (responseAsJson, throwable) -> {
+ handleTaskCompletion(throwable);
+ }
+ );
+ }
+
+ private void handleTaskCompletion(final Throwable throwable) {
+ if (throwable == null) {
+ log.info("Async task completed successfully.");
+ } else {
+ log.error("Async task failed. caused by : {}", throwable.getMessage());
+ }
+ }
+}
+
+
+