summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt16
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt8
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt18
3 files changed, 42 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
index eb450d73b..636a55423 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
@@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu
import org.slf4j.LoggerFactory
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
@Service
open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration,
@@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
: BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java)
+ private val ph = Phaser(1)
+
@PreAuthorize("hasRole('USER')")
override fun process(
responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
@@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
return object : StreamObserver<ExecutionServiceInput> {
override fun onNext(executionServiceInput: ExecutionServiceInput) {
try {
+ ph.register()
runBlocking {
executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
}
} catch (e: Exception) {
onError(e)
}
+ finally {
+ ph.arriveAndDeregister()
+ }
}
override fun onError(error: Throwable) {
@@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
}
}
}
+
+ @PreDestroy
+ fun preDestroy() {
+ val name = "BluePrintProcessingGRPCHandler"
+ log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+ ph.arriveAndAwaitAdvance()
+ log.info("Done waiting in $name")
+ }
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index b339903c5..a9dda7e0c 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
import javax.annotation.PreDestroy
@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
@@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer(
val log = logger(BluePrintProcessingKafkaConsumer::class)
+ private val ph = Phaser(1)
+
private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
companion object {
@@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer(
channel.consumeEach { message ->
launch {
try {
+ ph.register()
log.trace("Consumed Message : $message")
val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
@@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer(
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
}
+ finally {
+ ph.arriveAndDeregister()
+ }
}
}
}
@@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer(
log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
"message producer($PRODUCER_SELECTOR)...")
blueprintMessageConsumerService.shutDown()
+ ph.arriveAndAwaitAdvance()
} catch (e: Exception) {
log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
index d48f0c7e4..cc1229623 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
@@ -28,12 +28,15 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp
import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.FilePart
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.web.bind.annotation.*
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
@RestController
@RequestMapping("/api/v1/execution-service")
@@ -41,6 +44,10 @@ import org.springframework.web.bind.annotation.*
description = "Interaction with CBA.")
open class ExecutionServiceController {
+ private val log = logger(ExecutionServiceController::class)
+
+ private val ph = Phaser(1)
+
@Autowired
lateinit var executionServiceHandler: ExecutionServiceHandler
@@ -90,7 +97,18 @@ open class ExecutionServiceController {
if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
}
+
+ ph.register()
val processResult = executionServiceHandler.doProcess(executionServiceInput)
+ ph.arriveAndDeregister()
ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code))
}
+
+ @PreDestroy
+ fun preDestroy() {
+ val name = "ExecutionServiceController"
+ log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+ ph.arriveAndAwaitAdvance()
+ log.info("Done waiting in $name")
+ }
}