diff options
Diffstat (limited to 'cps-ncmp-rest')
5 files changed, 53 insertions, 9 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java index 64497b95f9..1f87865f21 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java @@ -25,11 +25,13 @@ import static org.onap.cps.ncmp.api.impl.operations.OperationType.READ; import java.util.Map; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Supplier; import org.onap.cps.ncmp.api.NetworkCmProxyDataService; import org.onap.cps.ncmp.api.impl.exception.InvalidDatastoreException; import org.onap.cps.ncmp.api.impl.operations.DatastoreType; import org.onap.cps.ncmp.api.impl.operations.OperationType; +import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils; import org.onap.cps.ncmp.api.models.CmResourceAddress; import org.onap.cps.ncmp.api.models.DataOperationRequest; import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException; @@ -99,8 +101,9 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH final DataOperationRequest dataOperationRequest, final String authorization) { final String requestId = UUID.randomUUID().toString(); - cpsNcmpTaskExecutor.executeTask( + cpsNcmpTaskExecutor.executeTaskWithErrorHandling( getTaskSupplierForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId, authorization), + getTaskCompletionHandlerForDataOperationRequest(topicParamInQuery, dataOperationRequest, requestId), timeOutInMilliSeconds); return ResponseEntity.ok(Map.of("requestId", requestId)); } @@ -139,4 +142,13 @@ public class NcmpPassthroughResourceRequestHandler extends NcmpDatastoreRequestH }; } + private static BiConsumer<Object, Throwable> getTaskCompletionHandlerForDataOperationRequest( + final String topicParamInQuery, + final DataOperationRequest dataOperationRequest, + final String requestId) { + return (result, throwable) -> + ResourceDataOperationRequestUtils.handleAsyncTaskCompletionForDataOperationsRequest(topicParamInQuery, + requestId, dataOperationRequest, throwable); + } + } diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java index ba68d5b757..2601c7a5b3 100644 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.rest.executor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -32,15 +33,29 @@ import org.springframework.stereotype.Service; public class CpsNcmpTaskExecutor { /** + * Execute a task asynchronously, and invoke completion handler when done. + * + * @param taskSupplier functional method is get() task needed to be executed asynchronously + * @param taskCompletionHandler the action to perform on task completion or error + * @param timeOutInMillis the time-out value in milliseconds + */ + public void executeTaskWithErrorHandling(final Supplier<Object> taskSupplier, + final BiConsumer<Object, Throwable> taskCompletionHandler, + final long timeOutInMillis) { + CompletableFuture.supplyAsync(taskSupplier) + .orTimeout(timeOutInMillis, MILLISECONDS) + .whenCompleteAsync(taskCompletionHandler); + } + + /** * Execute a task asynchronously. * - * @param taskSupplier functional method is get() task need to executed asynchronously + * @param taskSupplier functional method is get() task needed to be executed asynchronously * @param timeOutInMillis the time-out value in milliseconds */ public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { - CompletableFuture.supplyAsync(taskSupplier::get) - .orTimeout(timeOutInMillis, MILLISECONDS) - .whenCompleteAsync((taskResult, throwable) -> handleTaskCompletion(throwable)); + executeTaskWithErrorHandling(taskSupplier, (taskResult, throwable) -> handleTaskCompletion(throwable), + timeOutInMillis); } private void handleTaskCompletion(final Throwable throwable) { diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index a5b1f05ee1..2d7e9b2d03 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -198,7 +198,7 @@ class NetworkCmProxyControllerSpec extends Specification { and: 'async request id is generated' assert response.contentAsString.contains('requestId') then: 'the request is handled asynchronously' - 1 * mockCpsTaskExecutor.executeTask(*_) + 1 * mockCpsTaskExecutor.executeTaskWithErrorHandling(*_) where: 'the following data stores are used' datastore << [PASSTHROUGH_RUNNING, PASSTHROUGH_OPERATIONAL] } diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy index 1585616870..641715d0d2 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy @@ -79,7 +79,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { when: 'data operation request is executed' objectUnderTest.executeRequest('someTopic', new DataOperationRequest(), NO_AUTH_HEADER) then: 'the task is executed in an async fashion or not' - expectedCalls * spiedCpsNcmpTaskExecutor.executeTask(*_) + expectedCalls * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_) where: 'the following parameters are used' scenario | notificationFeatureEnabled || expectedCalls 'on' | true || 1 @@ -101,7 +101,7 @@ class NcmpDatastoreRequestHandlerSpec extends Specification { when: 'data operation request is executed' objectUnderTest.executeRequest('myTopic', dataOperationRequest, NO_AUTH_HEADER) then: 'the task is executed in an async fashion' - 1 * spiedCpsNcmpTaskExecutor.executeTask(*_) + 1 * spiedCpsNcmpTaskExecutor.executeTaskWithErrorHandling(*_) and: 'the network service is invoked' new PollingConditions().within(1) { assert networkServiceMethodCalled == true diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy index 010eda964d..4c8c40f4e6 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutorSpec.groovy @@ -33,6 +33,7 @@ class CpsNcmpTaskExecutorSpec extends Specification { def objectUnderTest = new CpsNcmpTaskExecutor() def logger = Spy(ListAppender<ILoggingEvent>) def enoughTime = 100 + def notEnoughTime = 10 void setup() { ((Logger) LoggerFactory.getLogger(CpsNcmpTaskExecutor.class)).addAppender(logger) @@ -67,6 +68,18 @@ class CpsNcmpTaskExecutorSpec extends Specification { assert loggingEvent.formattedMessage.contains('original exception message') } + def 'Task times out.'() { + when: 'task is executed without enough time to complete' + objectUnderTest.executeTask(taskSupplierForLongRunningTask(), notEnoughTime) + then: 'an event is logged with level ERROR' + new PollingConditions().within(1) { + def loggingEvent = getLoggingEvent() + assert loggingEvent.level == Level.ERROR + } + and: 'a timeout error message is logged' + assert loggingEvent.formattedMessage.contains('java.util.concurrent.TimeoutException') + } + def taskSupplier() { return () -> 'hello world' } @@ -75,6 +88,10 @@ class CpsNcmpTaskExecutorSpec extends Specification { return () -> { throw new RuntimeException('original exception message') } } + def taskSupplierForLongRunningTask() { + return () -> { sleep(enoughTime) } + } + def getLoggingEvent() { return logger.list[0] } |