diff options
author | Sebastien Premont-Tendland <sebastien.premont@bell.ca> | 2019-10-31 15:37:48 -0400 |
---|---|---|
committer | Sebastien Premont-Tendland <sebastien.premont@bell.ca> | 2019-11-05 18:05:03 -0500 |
commit | 4d411a80405b9031a10fe53a4d021f4f246bbaa2 (patch) | |
tree | 97a1916b8a82c67a5e599253717a9c5fdb6bc9dd | |
parent | e967b59012d3aec0a5ad01d1ad508fc1e0a61a5f (diff) |
Rolling upgrade support for in-flight requests
Three entry points are being handled :
1 - REST endpoint
2 - gRPC endpoint
3 - Kafka consumer
We make use of Phaser object to make sure the PreDestroy callback wait
for all requests to be executed before stopping the process.
The docker image was also modified to make sure the java process becomes
PID 1 in the container in order to catch the SIGTERM signal which
triggers the PreDestroy callback of Spring. This was done by using
the "exec" command in bash.
Issue-ID: CCSDK-1885
Signed-off-by: Sebastien Premont-Tendland <sebastien.premont@bell.ca>
Change-Id: I3e2a72e26a4c8b7768ebc374ea40aa8d55fb6761
6 files changed, 52 insertions, 16 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile index 207cec5cb..2a85f1c95 100755 --- a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile +++ b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile @@ -1,7 +1,6 @@ FROM omahoco1/alpine-java-python # add entrypoint -COPY run.source /etc/run.source COPY startService.sh /startService.sh RUN chmod 777 /startService.sh && dos2unix /startService.sh @@ -12,4 +11,4 @@ RUN tar -xzf /source.tar.gz -C /tmp \ && rm -rf /source.tar.gz \ && rm -rf /tmp/@project.build.finalName@ -ENTRYPOINT /startService.sh +ENTRYPOINT [ "/startService.sh" ] diff --git a/ms/blueprintsprocessor/application/src/main/docker/run.source b/ms/blueprintsprocessor/application/src/main/docker/run.source deleted file mode 100755 index f3d8c7ca6..000000000 --- a/ms/blueprintsprocessor/application/src/main/docker/run.source +++ /dev/null @@ -1,12 +0,0 @@ -java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \ --DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \ --DrouteOffer=${ROUTEOFFER} \ --DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \ --DSecurityFilePath=/etc \ --DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \ --Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \ --Dlogging.config=${APP_CONFIG_HOME}/logback.xml \ --Djava.security.egd=file:/dev/./urandom \ --DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ --Dspring.config.location=${APP_CONFIG_HOME}/ \ -org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh index 14d772e41..11c471f2d 100644 --- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh +++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh @@ -7,4 +7,15 @@ export APP_HOME=/opt/app/onap keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer -source /etc/run.source +exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \ +-DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \ +-DrouteOffer=${ROUTEOFFER} \ +-DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \ +-DSecurityFilePath=/etc \ +-DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \ +-Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \ +-Dlogging.config=${APP_CONFIG_HOME}/logback.xml \ +-Djava.security.egd=file:/dev/./urandom \ +-DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ +-Dspring.config.location=${APP_CONFIG_HOME}/ \ +org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index eb450d73b..636a55423 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu import org.slf4j.LoggerFactory import org.springframework.security.access.prepost.PreAuthorize import org.springframework.stereotype.Service +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, @@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) + private val ph = Phaser(1) + @PreAuthorize("hasRole('USER')") override fun process( responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { @@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration return object : StreamObserver<ExecutionServiceInput> { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { + ph.register() runBlocking { executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) } } catch (e: Exception) { onError(e) } + finally { + ph.arriveAndDeregister() + } } override fun onError(error: Throwable) { @@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration } } } + + @PreDestroy + fun preDestroy() { + val name = "BluePrintProcessingGRPCHandler" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index b339903c5..a9dda7e0c 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.util.concurrent.Phaser import javax.annotation.PreDestroy @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], @@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer( val log = logger(BluePrintProcessingKafkaConsumer::class) + private val ph = Phaser(1) + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService companion object { @@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer( channel.consumeEach { message -> launch { try { + ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) @@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer( } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } + finally { + ph.arriveAndDeregister() + } } } } @@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer( log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + "message producer($PRODUCER_SELECTOR)...") blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() } catch (e: Exception) { log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index b246b33e1..130e23ecc 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -34,6 +34,8 @@ import org.springframework.http.ResponseEntity import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @RestController @RequestMapping("/api/v1/execution-service") @@ -42,6 +44,8 @@ import reactor.core.publisher.Mono open class ExecutionServiceController { val log = logger(ExecutionServiceController::class) + private val ph = Phaser(1) + @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -51,7 +55,6 @@ open class ExecutionServiceController { @ResponseBody @ApiOperation(value = "Health Check", hidden = true) fun executionServiceControllerHealthCheck() = monoMdc(Dispatchers.IO) { - log.info("Health check success...") "Success".asJsonPrimitive() } @@ -69,8 +72,19 @@ open class ExecutionServiceController { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } + + ph.register() val processResult = executionServiceHandler.doProcess(executionServiceInput) + ph.arriveAndDeregister() ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } + + @PreDestroy + fun preDestroy() { + val name = "ExecutionServiceController" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } |