From 03c7a8ddc3049f1b79b40ec96913eaaf8a539ef2 Mon Sep 17 00:00:00 2001 From: Sebastien Premont-Tendland Date: Thu, 31 Oct 2019 15:37:48 -0400 Subject: Rolling upgrade support for in-flight requests Three entry points are being handled : 1 - REST endpoint 2 - gRPC endpoint 3 - Kafka consumer We make use of Phaser object to make sure the PreDestroy callback wait for all requests to be executed before stopping the process. The docker image was also modified to make sure the java process becomes PID 1 in the container in order to catch the SIGTERM signal which triggers the PreDestroy callback of Spring. This was done by using the "exec" command in bash. Issue-ID: CCSDK-1885 Signed-off-by: Sebastien Premont-Tendland Change-Id: I64611e569ddb78499aed2375e6186f028d1d8fa0 --- .../selfservice/api/BluePrintProcessingGRPCHandler.kt | 16 ++++++++++++++++ .../api/BluePrintProcessingKafkaConsumer.kt | 8 ++++++++ .../selfservice/api/ExecutionServiceController.kt | 18 ++++++++++++++++++ 3 files changed, 42 insertions(+) (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api') diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index eb450d73b..636a55423 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu import org.slf4j.LoggerFactory import org.springframework.security.access.prepost.PreAuthorize import org.springframework.stereotype.Service +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, @@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) + private val ph = Phaser(1) + @PreAuthorize("hasRole('USER')") override fun process( responseObserver: StreamObserver): StreamObserver { @@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration return object : StreamObserver { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { + ph.register() runBlocking { executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) } } catch (e: Exception) { onError(e) } + finally { + ph.arriveAndDeregister() + } } override fun onError(error: Throwable) { @@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration } } } + + @PreDestroy + fun preDestroy() { + val name = "BluePrintProcessingGRPCHandler" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index b339903c5..a9dda7e0c 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.util.concurrent.Phaser import javax.annotation.PreDestroy @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], @@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer( val log = logger(BluePrintProcessingKafkaConsumer::class) + private val ph = Phaser(1) + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService companion object { @@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer( channel.consumeEach { message -> launch { try { + ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) @@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer( } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } + finally { + ph.arriveAndDeregister() + } } } } @@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer( log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + "message producer($PRODUCER_SELECTOR)...") blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() } catch (e: Exception) { log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index d48f0c7e4..cc1229623 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -28,12 +28,15 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.http.codec.multipart.FilePart import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @RestController @RequestMapping("/api/v1/execution-service") @@ -41,6 +44,10 @@ import org.springframework.web.bind.annotation.* description = "Interaction with CBA.") open class ExecutionServiceController { + private val log = logger(ExecutionServiceController::class) + + private val ph = Phaser(1) + @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -90,7 +97,18 @@ open class ExecutionServiceController { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } + + ph.register() val processResult = executionServiceHandler.doProcess(executionServiceInput) + ph.arriveAndDeregister() ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } + + @PreDestroy + fun preDestroy() { + val name = "ExecutionServiceController" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } -- cgit 1.2.3-korg