diff options
author | Brinda Santh Muthuramalingam <brindasanth@in.ibm.com> | 2019-11-06 21:09:46 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-11-06 21:09:46 +0000 |
commit | 88e3ef2d396bc91051b182d11a2d490bf37abf08 (patch) | |
tree | da73e0f1deea4f741369827ad79f65e8e296301f /ms/blueprintsprocessor | |
parent | a835890f817a1e3627c281e98ab786ee7bf8c2ba (diff) | |
parent | 4d411a80405b9031a10fe53a4d021f4f246bbaa2 (diff) |
Merge "Rolling upgrade support for in-flight requests"
Diffstat (limited to 'ms/blueprintsprocessor')
6 files changed, 52 insertions, 16 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile index 207cec5cb..2a85f1c95 100755 --- a/ms/blueprintsprocessor/application/src/main/docker/Dockerfile +++ b/ms/blueprintsprocessor/application/src/main/docker/Dockerfile @@ -1,7 +1,6 @@ FROM omahoco1/alpine-java-python # add entrypoint -COPY run.source /etc/run.source COPY startService.sh /startService.sh RUN chmod 777 /startService.sh && dos2unix /startService.sh @@ -12,4 +11,4 @@ RUN tar -xzf /source.tar.gz -C /tmp \ && rm -rf /source.tar.gz \ && rm -rf /tmp/@project.build.finalName@ -ENTRYPOINT /startService.sh +ENTRYPOINT [ "/startService.sh" ] diff --git a/ms/blueprintsprocessor/application/src/main/docker/run.source b/ms/blueprintsprocessor/application/src/main/docker/run.source deleted file mode 100755 index f3d8c7ca6..000000000 --- a/ms/blueprintsprocessor/application/src/main/docker/run.source +++ /dev/null @@ -1,12 +0,0 @@ -java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \ --DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \ --DrouteOffer=${ROUTEOFFER} \ --DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \ --DSecurityFilePath=/etc \ --DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \ --Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \ --Dlogging.config=${APP_CONFIG_HOME}/logback.xml \ --Djava.security.egd=file:/dev/./urandom \ --DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ --Dspring.config.location=${APP_CONFIG_HOME}/ \ -org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh index 14d772e41..11c471f2d 100644 --- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh +++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh @@ -7,4 +7,15 @@ export APP_HOME=/opt/app/onap keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer -source /etc/run.source +exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \ +-DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \ +-DrouteOffer=${ROUTEOFFER} \ +-DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \ +-DSecurityFilePath=/etc \ +-DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \ +-Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \ +-Dlogging.config=${APP_CONFIG_HOME}/logback.xml \ +-Djava.security.egd=file:/dev/./urandom \ +-DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ +-Dspring.config.location=${APP_CONFIG_HOME}/ \ +org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index eb450d73b..636a55423 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu import org.slf4j.LoggerFactory import org.springframework.security.access.prepost.PreAuthorize import org.springframework.stereotype.Service +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @Service open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration, @@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java) + private val ph = Phaser(1) + @PreAuthorize("hasRole('USER')") override fun process( responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { @@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration return object : StreamObserver<ExecutionServiceInput> { override fun onNext(executionServiceInput: ExecutionServiceInput) { try { + ph.register() runBlocking { executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) } } catch (e: Exception) { onError(e) } + finally { + ph.arriveAndDeregister() + } } override fun onError(error: Throwable) { @@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration } } } + + @PreDestroy + fun preDestroy() { + val name = "BluePrintProcessingGRPCHandler" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index b339903c5..a9dda7e0c 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.util.concurrent.Phaser import javax.annotation.PreDestroy @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], @@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer( val log = logger(BluePrintProcessingKafkaConsumer::class) + private val ph = Phaser(1) + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService companion object { @@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer( channel.consumeEach { message -> launch { try { + ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) @@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer( } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } + finally { + ph.arriveAndDeregister() + } } } } @@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer( log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + "message producer($PRODUCER_SELECTOR)...") blueprintMessageConsumerService.shutDown() + ph.arriveAndAwaitAdvance() } catch (e: Exception) { log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index b246b33e1..130e23ecc 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -34,6 +34,8 @@ import org.springframework.http.ResponseEntity import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* import reactor.core.publisher.Mono +import java.util.concurrent.Phaser +import javax.annotation.PreDestroy @RestController @RequestMapping("/api/v1/execution-service") @@ -42,6 +44,8 @@ import reactor.core.publisher.Mono open class ExecutionServiceController { val log = logger(ExecutionServiceController::class) + private val ph = Phaser(1) + @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -51,7 +55,6 @@ open class ExecutionServiceController { @ResponseBody @ApiOperation(value = "Health Check", hidden = true) fun executionServiceControllerHealthCheck() = monoMdc(Dispatchers.IO) { - log.info("Health check success...") "Success".asJsonPrimitive() } @@ -69,8 +72,19 @@ open class ExecutionServiceController { if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") } + + ph.register() val processResult = executionServiceHandler.doProcess(executionServiceInput) + ph.arriveAndDeregister() ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) } + + @PreDestroy + fun preDestroy() { + val name = "ExecutionServiceController" + log.info("Starting to shutdown $name waiting for in-flight requests to finish ...") + ph.arriveAndAwaitAdvance() + log.info("Done waiting in $name") + } } |