diff options
author | Sebastien Premont-Tendland <sebastien.premont@bell.ca> | 2019-10-31 15:37:48 -0400 |
---|---|---|
committer | Sebastien Premont-Tendland <sebastien.premont@bell.ca> | 2019-11-05 18:05:03 -0500 |
commit | 4d411a80405b9031a10fe53a4d021f4f246bbaa2 (patch) | |
tree | 97a1916b8a82c67a5e599253717a9c5fdb6bc9dd /ms/blueprintsprocessor/modules/inbounds | |
parent | e967b59012d3aec0a5ad01d1ad508fc1e0a61a5f (diff) |
Rolling upgrade support for in-flight requests
Three entry points are being handled :
1 - REST endpoint
2 - gRPC endpoint
3 - Kafka consumer
We make use of Phaser object to make sure the PreDestroy callback wait
for all requests to be executed before stopping the process.
The docker image was also modified to make sure the java process becomes
PID 1 in the container in order to catch the SIGTERM signal which
triggers the PreDestroy callback of Spring. This was done by using
the "exec" command in bash.
Issue-ID: CCSDK-1885
Signed-off-by: Sebastien Premont-Tendland <sebastien.premont@bell.ca>
Change-Id: I3e2a72e26a4c8b7768ebc374ea40aa8d55fb6761
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
3 files changed, 39 insertions, 1 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index eb450d73b..636a55423 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu import org.slf4j.LoggerFactory import org.springframework.security.access.prepost.PreAuthorize import org.springframework.stereotype.Service +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, @@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) + private val ph = Phaser(1) + @PreAuthorize("hasRole('USER')") override fun process( responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { @@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration return object : StreamObserver<ExecutionServiceInput> { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { + ph.register() runBlocking { executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) } } catch (e: Exception) { onError(e) } + finally { + ph.arriveAndDeregister() + } } override fun onError(error: Throwable) { @@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration } } } + + @PreDestroy + fun preDestroy() { + val name = "BluePrintProcessingGRPCHandler" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index b339903c5..a9dda7e0c 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.util.concurrent.Phaser import javax.annotation.PreDestroy @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], @@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer( val log = logger(BluePrintProcessingKafkaConsumer::class) + private val ph = Phaser(1) + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService companion object { @@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer( channel.consumeEach { message -> launch { try { + ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) @@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer( } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } + finally { + ph.arriveAndDeregister() + } } } } @@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer( log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + "message producer($PRODUCER_SELECTOR)...") blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() } catch (e: Exception) { log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index b246b33e1..130e23ecc 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -34,6 +34,8 @@ import org.springframework.http.ResponseEntity import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @RestController @RequestMapping("/api/v1/execution-service") @@ -42,6 +44,8 @@ import reactor.core.publisher.Mono open class ExecutionServiceController { val log = logger(ExecutionServiceController::class) + private val ph = Phaser(1) + @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -51,7 +55,6 @@ open class ExecutionServiceController { @ResponseBody @ApiOperation(value = "Health Check", hidden = true) fun executionServiceControllerHealthCheck() = monoMdc(Dispatchers.IO) { - log.info("Health check success...") "Success".asJsonPrimitive() } @@ -69,8 +72,19 @@ open class ExecutionServiceController { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } + + ph.register() val processResult = executionServiceHandler.doProcess(executionServiceInput) + ph.arriveAndDeregister() ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } + + @PreDestroy + fun preDestroy() { + val name = "ExecutionServiceController" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } |