diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
3 files changed, 39 insertions, 1 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index eb450d73b..636a55423 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu import org.slf4j.LoggerFactory import org.springframework.security.access.prepost.PreAuthorize import org.springframework.stereotype.Service +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, @@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) + private val ph = Phaser(1) + @PreAuthorize("hasRole('USER')") override fun process( responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { @@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration return object : StreamObserver<ExecutionServiceInput> { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { + ph.register() runBlocking { executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) } } catch (e: Exception) { onError(e) } + finally { + ph.arriveAndDeregister() + } } override fun onError(error: Throwable) { @@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration } } } + + @PreDestroy + fun preDestroy() { + val name = "BluePrintProcessingGRPCHandler" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index b339903c5..a9dda7e0c 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.util.concurrent.Phaser import javax.annotation.PreDestroy @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], @@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer( val log = logger(BluePrintProcessingKafkaConsumer::class) + private val ph = Phaser(1) + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService companion object { @@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer( channel.consumeEach { message -> launch { try { + ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) @@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer( } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } + finally { + ph.arriveAndDeregister() + } } } } @@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer( log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + "message producer($PRODUCER_SELECTOR)...") blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() } catch (e: Exception) { log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index b246b33e1..130e23ecc 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -34,6 +34,8 @@ import org.springframework.http.ResponseEntity import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @RestController @RequestMapping("/api/v1/execution-service") @@ -42,6 +44,8 @@ import reactor.core.publisher.Mono open class ExecutionServiceController { val log = logger(ExecutionServiceController::class) + private val ph = Phaser(1) + @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -51,7 +55,6 @@ open class ExecutionServiceController { @ResponseBody @ApiOperation(value = "Health Check", hidden = true) fun executionServiceControllerHealthCheck() = monoMdc(Dispatchers.IO) { - log.info("Health check success...") "Success".asJsonPrimitive() } @@ -69,8 +72,19 @@ open class ExecutionServiceController { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } + + ph.register() val processResult = executionServiceHandler.doProcess(executionServiceInput) + ph.arriveAndDeregister() ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } + + @PreDestroy + fun preDestroy() { + val name = "ExecutionServiceController" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } |