From 38f34045dc5ecdcca9b70e4aa6182b9553adc0ef Mon Sep 17 00:00:00 2001 From: Alexis de Talhouët Date: Fri, 22 Feb 2019 10:31:14 -0500 Subject: Properly handle async gRPC request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: Ia5016759466c57f749c146a004374c2cbff60f9d Issue-ID: CCSDK-947 Signed-off-by: Alexis de Talhouët --- .../api/BluePrintProcessingGRPCHandler.kt | 17 ++++++------ .../selfservice/api/ExecutionServiceController.kt | 15 +++++++---- .../selfservice/api/ExecutionServiceHandler.kt | 31 +++++++++++++++++----- .../selfservice/api/utils/BluePrintMappings.kt | 2 +- 4 files changed, 43 insertions(+), 22 deletions(-) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index 453306de..cf6776c5 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -19,7 +19,6 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api import io.grpc.stub.StreamObserver import org.onap.ccsdk.apps.blueprintsprocessor.core.BluePrintCoreConfiguration import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toJava -import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput @@ -32,14 +31,14 @@ class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: Blu : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) - override fun process(responseObserver: StreamObserver?): StreamObserver { + override fun process( + responseObserver: StreamObserver): StreamObserver { return object : StreamObserver { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { - val output = executionServiceHandler.process(executionServiceInput.toJava()) - .toProto(executionServiceInput.payload) - responseObserver?.onNext(output) + val inputPayload = executionServiceInput.payload + executionServiceHandler.process(executionServiceInput.toJava(), responseObserver, inputPayload) } catch (e: Exception) { onError(e) } @@ -47,13 +46,13 @@ class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: Blu override fun onError(error: Throwable) { log.debug("Fail to process message", error) - responseObserver?.onError(io.grpc.Status.INTERNAL - .withDescription(error.message) - .asException()) + responseObserver.onError(io.grpc.Status.INTERNAL + .withDescription(error.message) + .asException()) } override fun onCompleted() { - responseObserver?.onCompleted() + log.info("Completed") } } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index e4734c44..6477c067 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api import io.swagger.annotations.ApiOperation +import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.springframework.beans.factory.annotation.Autowired @@ -49,15 +50,19 @@ class ExecutionServiceController { @ResponseBody fun upload(@RequestPart("file") parts: Mono): Mono { return parts - .filter { it is FilePart } - .ofType(FilePart::class.java) - .flatMap(executionServiceHandler::upload) + .filter { it is FilePart } + .ofType(FilePart::class.java) + .flatMap(executionServiceHandler::upload) } @RequestMapping(path = ["/process"], method = [RequestMethod.POST], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ApiOperation(value = "Resolve Resource Mappings", notes = "Takes the blueprint information and process as per the payload") + @ApiOperation(value = "Resolve Resource Mappings", + notes = "Takes the blueprint information and process as per the payload") @ResponseBody fun process(@RequestBody executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { - return executionServiceHandler.process(executionServiceInput) + if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { + throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") + } + return executionServiceHandler.processSync(executionServiceInput) } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index ec605c1d..262c33f9 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -16,6 +16,8 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api +import com.google.protobuf.Struct +import io.grpc.stub.StreamObserver import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch @@ -26,6 +28,7 @@ import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInp import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.saveCBAFile +import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto import org.onap.ccsdk.apps.blueprintsprocessor.services.workflow.BlueprintDGExecutionService import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintException @@ -56,20 +59,33 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC } } - fun process(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { - return when { + fun process(executionServiceInput: ExecutionServiceInput, + responseObserver: StreamObserver, + inputPayload: Struct) { + when { executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> { GlobalScope.launch(Dispatchers.Default) { - // TODO post result in DMaaP val executionServiceOutput = doProcess(executionServiceInput) + responseObserver.onNext(executionServiceOutput.toProto(inputPayload)) + responseObserver.onCompleted() } - response(executionServiceInput) + responseObserver.onNext(response(executionServiceInput).toProto(inputPayload)) } - executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> doProcess(executionServiceInput) - else -> response(executionServiceInput, "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true) + executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> { + val executionServiceOutput = doProcess(executionServiceInput) + responseObserver.onNext(executionServiceOutput.toProto(inputPayload)) + responseObserver.onCompleted() + } + else -> responseObserver.onNext(response(executionServiceInput, + "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", + true).toProto(inputPayload)); } } + fun processSync(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + return doProcess(executionServiceInput) + } + private fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { val requestId = executionServiceInput.commonHeader.requestId log.info("processing request id $requestId") @@ -87,7 +103,8 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC return blueprintDGExecutionService.executeDirectedGraph(blueprintRuntimeService, executionServiceInput) } - fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "", failure: Boolean = false): ExecutionServiceOutput { + private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "", + failure: Boolean = false): ExecutionServiceOutput { val executionServiceOutput = ExecutionServiceOutput() executionServiceOutput.commonHeader = executionServiceInput.commonHeader executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/utils/BluePrintMappings.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/utils/BluePrintMappings.kt index 220a6fd6..c8ce1c30 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/utils/BluePrintMappings.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/utils/BluePrintMappings.kt @@ -139,7 +139,7 @@ fun Flag.toJava(): org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Flags { fun org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status.toProto(): Status { val status = Status.newBuilder() status.code = this.code - status.errorMessage = this.errorMessage + status.errorMessage = this.errorMessage ?: "" status.message = this.message status.timestamp = this.timestamp.toString() status.eventType = this.eventType -- cgit 1.2.3-korg