diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
3 files changed, 32 insertions, 37 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index aadbec83a..ebeda69b5 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -1,5 +1,6 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. + * Modifications Copyright © 2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api import io.grpc.stub.StreamObserver +import kotlinx.coroutines.runBlocking import org.onap.ccsdk.apps.blueprintsprocessor.core.BluePrintCoreConfiguration import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toJava import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc @@ -28,18 +30,20 @@ import org.springframework.stereotype.Service @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, - private val executionServiceHandler: ExecutionServiceHandler) + private val executionServiceHandler: ExecutionServiceHandler) : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) @PreAuthorize("hasRole('USER')") override fun process( - responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { + responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { return object : StreamObserver<ExecutionServiceInput> { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { - executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + runBlocking { + executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + } } catch (e: Exception) { onError(e) } @@ -48,8 +52,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration override fun onError(error: Throwable) { log.debug("Fail to process message", error) responseObserver.onError(io.grpc.Status.INTERNAL - .withDescription(error.message) - .asException()) + .withDescription(error.message) + .asException()) } override fun onCompleted() { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index 16f0fa869..6e7294791 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -1,5 +1,6 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. + * Modifications Copyright © 2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api import io.swagger.annotations.ApiOperation +import kotlinx.coroutines.runBlocking import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput @@ -24,13 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType import org.springframework.http.codec.multipart.FilePart import org.springframework.security.access.prepost.PreAuthorize -import org.springframework.web.bind.annotation.PostMapping -import org.springframework.web.bind.annotation.RequestBody -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.RequestMethod -import org.springframework.web.bind.annotation.RequestPart -import org.springframework.web.bind.annotation.ResponseBody -import org.springframework.web.bind.annotation.RestController +import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono @RestController @@ -42,8 +38,8 @@ open class ExecutionServiceController { @RequestMapping(path = ["/ping"], method = [RequestMethod.GET], produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody - fun ping(): Mono<String> { - return Mono.just("Success") + fun ping(): String = runBlocking { + "Success" } @PostMapping(path = ["/upload"], consumes = [MediaType.MULTIPART_FORM_DATA_VALUE]) @@ -52,20 +48,20 @@ open class ExecutionServiceController { @PreAuthorize("hasRole('USER')") fun upload(@RequestPart("file") parts: Mono<FilePart>): Mono<String> { return parts - .filter { it is FilePart } - .ofType(FilePart::class.java) - .flatMap(executionServiceHandler::upload) + .filter { it is FilePart } + .ofType(FilePart::class.java) + .flatMap(executionServiceHandler::upload) } @RequestMapping(path = ["/process"], method = [RequestMethod.POST], produces = [MediaType.APPLICATION_JSON_VALUE]) @ApiOperation(value = "Resolve Resource Mappings", - notes = "Takes the blueprint information and process as per the payload") + notes = "Takes the blueprint information and process as per the payload") @ResponseBody @PreAuthorize("hasRole('USER')") - fun process(@RequestBody executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + fun process(@RequestBody executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput = runBlocking { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } - return executionServiceHandler.processSync(executionServiceInput) + executionServiceHandler.doProcess(executionServiceInput) } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 5278c17ed..d8afe1688 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -1,5 +1,6 @@ /* * Copyright © 2017-2018 AT&T Intellectual Property. + * Modifications Copyright © 2019 IBM. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,18 +23,14 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.onap.ccsdk.apps.blueprintsprocessor.core.BluePrintCoreConfiguration -import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC -import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC -import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.* import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.saveCBAFile import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto -import org.onap.ccsdk.apps.blueprintsprocessor.services.workflow.BlueprintDGExecutionService import org.onap.ccsdk.apps.controllerblueprints.common.api.EventType import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintException import org.onap.ccsdk.apps.controllerblueprints.core.interfaces.BluePrintCatalogService +import org.onap.ccsdk.apps.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService import org.onap.ccsdk.apps.controllerblueprints.core.utils.BluePrintFileUtils import org.onap.ccsdk.apps.controllerblueprints.core.utils.BluePrintMetadataUtils import org.slf4j.LoggerFactory @@ -44,7 +41,8 @@ import reactor.core.publisher.Mono @Service class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, private val bluePrintCatalogService: BluePrintCatalogService, - private val blueprintDGExecutionService: BlueprintDGExecutionService) { + private val bluePrintWorkflowExecutionService + : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>) { private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString()) @@ -60,8 +58,8 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC } } - fun process(executionServiceInput: ExecutionServiceInput, - responseObserver: StreamObserver<org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput>) { + suspend fun process(executionServiceInput: ExecutionServiceInput, + responseObserver: StreamObserver<org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput>) { when { executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> { GlobalScope.launch(Dispatchers.Default) { @@ -77,16 +75,12 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC responseObserver.onCompleted() } else -> responseObserver.onNext(response(executionServiceInput, - "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", - true).toProto()); + "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", + true).toProto()); } } - fun processSync(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { - return doProcess(executionServiceInput) - } - - private fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { val requestId = executionServiceInput.commonHeader.requestId log.info("processing request id $requestId") @@ -100,7 +94,8 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - return blueprintDGExecutionService.executeDirectedGraph(blueprintRuntimeService, executionServiceInput) + return bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService, + executionServiceInput, hashMapOf()) } private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "", |