diff options
author | JosephKeenan <joseph.keenan@est.tech> | 2022-05-24 18:59:25 +0100 |
---|---|---|
committer | JosephKeenan <joseph.keenan@est.tech> | 2022-05-25 10:47:34 +0100 |
commit | f31c7f8bd4985c84f9126d071439c1a4de57f704 (patch) | |
tree | 3b5d91b6357705304ae95fe1ad01156afbded020 /cps-ncmp-rest/src/main/java/org | |
parent | 4cf4962b74765a5afe234aa258a9143ea6936f73 (diff) |
Async request response NCMP -> Client
-Added consumer for DMI events and producer for forwarding to client
-Added schemas for events
-Updated tests
-Added new module for ncmp events
-Used mapstruct for event mapping
Issue-ID: CPS-830
Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Diffstat (limited to 'cps-ncmp-rest/src/main/java/org')
3 files changed, 142 insertions, 41 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index cedc94672c..11517bcc9e 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -46,6 +46,7 @@ import org.onap.cps.ncmp.api.impl.exception.InvalidTopicException; import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.rest.api.NetworkCmProxyApi; +import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper; import org.onap.cps.ncmp.rest.model.CmHandleProperties; import org.onap.cps.ncmp.rest.model.CmHandleProperty; @@ -61,6 +62,7 @@ import org.onap.cps.ncmp.rest.model.RestOutputCmHandle; import org.onap.cps.ncmp.rest.model.RestOutputCmHandlePublicProperties; import org.onap.cps.utils.CpsValidator; import org.onap.cps.utils.JsonObjectMapper; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; @@ -75,12 +77,14 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { private static final String NO_BODY = null; private static final String NO_REQUEST_ID = null; private static final String NO_TOPIC = null; - public static final String ASYNC_REQUEST_ID = "requestId"; - private final NetworkCmProxyDataService networkCmProxyDataService; private final JsonObjectMapper jsonObjectMapper; private final NcmpRestInputMapper ncmpRestInputMapper; private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper; + private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor; + + @Value("${notification.async.executor.time-out-value-in-ms:2000}") + private int timeOutInMilliSeconds; /** * Get resource data from operational datastore. @@ -96,19 +100,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery); - final Map<String, Object> asyncResponseData = asyncResponse.getBody(); + if (isValidTopic(topicParamInQuery)) { + final String requestId = UUID.randomUUID().toString(); + cpsNcmpTaskExecutor.executeTask(() -> + networkCmProxyDataService.getResourceDataOperationalForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, + requestId + ), timeOutInMilliSeconds + ); + return acknowledgeAsyncRequest(requestId); + } - final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(cmHandle, - resourceIdentifier, - optionsParamInQuery, - asyncResponseData == null ? NO_TOPIC : topicParamInQuery, - asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString()); + final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID); - if (asyncResponseData == null) { - return ResponseEntity.ok(responseObject); - } - return ResponseEntity.ok(asyncResponse); + return ResponseEntity.ok(responseObject); } /** @@ -125,19 +131,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery); - final Map<String, Object> asyncResponseData = asyncResponse.getBody(); + if (isValidTopic(topicParamInQuery)) { + final String resourceDataRequestId = UUID.randomUUID().toString(); + cpsNcmpTaskExecutor.executeTask(() -> + networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, + resourceDataRequestId + ), timeOutInMilliSeconds + ); + return acknowledgeAsyncRequest(resourceDataRequestId); + } - final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(cmHandle, - resourceIdentifier, - optionsParamInQuery, - asyncResponseData == null ? NO_TOPIC : topicParamInQuery, - asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString()); + final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID); - if (asyncResponseData == null) { - return ResponseEntity.ok(responseObject); - } - return ResponseEntity.ok(asyncResponse); + return ResponseEntity.ok(responseObject); } @Override @@ -319,18 +327,7 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { return restOutputCmHandle; } - private ResponseEntity<Map<String, Object>> populateAsyncResponse(final String topicParamInQuery) { - final boolean processAsynchronously = hasTopicParameter(topicParamInQuery); - final Map<String, Object> responseData; - if (processAsynchronously) { - responseData = getAsyncResponseData(); - } else { - responseData = null; - } - return ResponseEntity.ok().body(responseData); - } - - private static boolean hasTopicParameter(final String topicName) { + private static boolean isValidTopic(final String topicName) { if (topicName == null) { return false; } @@ -340,11 +337,11 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic"); } - private Map<String, Object> getAsyncResponseData() { - final Map<String, Object> asyncResponseData = new HashMap<>(1); - final String resourceDataRequestId = UUID.randomUUID().toString(); - asyncResponseData.put(ASYNC_REQUEST_ID, resourceDataRequestId); - return asyncResponseData; + private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) { + final Map<String, Object> acknowledgeData = new HashMap<>(1); + acknowledgeData.put("requestId", requestId); + return ResponseEntity.ok(acknowledgeData); } } + diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java new file mode 100644 index 0000000000..3e8929d2e3 --- /dev/null +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.rest.exceptions; + +import lombok.Getter; + +public class CpsTaskExecutionException extends RuntimeException { + + private static final long serialVersionUID = 1481520410918497454L; + + @Getter + final String details; + + /** + * Constructor. + * + * @param message the error message + * @param details the error details + * @param cause the cause of the exception + */ + public CpsTaskExecutionException(final String message, final String details, final Throwable cause) { + super(message, cause); + this.details = details; + } + +}
\ No newline at end of file diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java new file mode 100644 index 0000000000..93aa2858ca --- /dev/null +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.rest.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class CpsNcmpTaskExecutor { + + /** + * Execute task asynchronously and publish response to supplied topic. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param timeOutInMillis the time out value in milliseconds + */ + public void executeTask(final Supplier<Object> taskSupplier, final int timeOutInMillis) { + CompletableFuture.supplyAsync(taskSupplier::get) + .orTimeout(timeOutInMillis, MILLISECONDS) + .whenCompleteAsync( + (responseAsJson, throwable) -> { + handleTaskCompletion(throwable); + } + ); + } + + private void handleTaskCompletion(final Throwable throwable) { + if (throwable == null) { + log.info("Async task completed successfully."); + } else { + log.error("Async task failed. caused by : {}", throwable.getMessage()); + } + } +} + + + |