summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
committerBrinda Santh <brindasanth@in.ibm.com>2019-09-05 11:06:48 -0400
commita2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (patch)
tree45cd9390e65bf1b3851f1e00f7a4cdf21054b93e /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
parent8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff)
Add Config based blueprint process consumer
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt108
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt58
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt72
3 files changed, 108 insertions, 130 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
new file mode 100644
index 000000000..b339903c5
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.event.ApplicationReadyEvent
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import javax.annotation.PreDestroy
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
+ havingValue = "true")
+@Service
+open class BluePrintProcessingKafkaConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler) {
+
+ val log = logger(BluePrintProcessingKafkaConsumer::class)
+
+ private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
+
+ companion object {
+ const val CONSUMER_SELECTOR = "self-service-api"
+ const val PRODUCER_SELECTOR = "self-service-api"
+ }
+
+ @EventListener(ApplicationReadyEvent::class)
+ fun setupMessageListener() = runBlocking {
+ try {
+ log.info("Setting up message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR)...")
+
+ /** Get the Message Consumer Service **/
+ blueprintMessageConsumerService = try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(CONSUMER_SELECTOR)
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create consumer service ${e.message}")
+ }
+
+ /** Get the Message Producer Service **/
+ val blueprintMessageProducerService = try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService(PRODUCER_SELECTOR)
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create producer service ${e.message}")
+ }
+
+ launch {
+ /** Subscribe to the consumer topics */
+ val additionalConfig: MutableMap<String, Any> = hashMapOf()
+ val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
+ channel.consumeEach { message ->
+ launch {
+ try {
+ log.trace("Consumed Message : $message")
+ val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+ //TODO("In future, Message publisher configuration vary with respect to request")
+ /** Send the response message */
+ blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ } catch (e: Exception) {
+ log.error("failed in processing the consumed message : $message", e)
+ }
+ }
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed to start message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR) ", e)
+ }
+ }
+
+ @PreDestroy
+ fun shutdownMessageListener() = runBlocking {
+ try {
+ log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
+ "message producer($PRODUCER_SELECTOR)...")
+ blueprintMessageConsumerService.shutDown()
+ } catch (e: Exception) {
+ log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
+ }
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
deleted file mode 100644
index 17e157d15..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import com.fasterxml.jackson.databind.DeserializationFeature
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
-import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
-import org.springframework.kafka.support.serializer.JsonDeserializer
-
-@Configuration
-open class MessagingConfig {
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
- lateinit var groupId: String
-
- @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
- lateinit var bootstrapServers: String
-
- open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
- val configProperties = hashMapOf<String, Any>()
- configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
- configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
- configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
-
- val deserializer = JsonDeserializer<ExecutionServiceInput>()
- deserializer.setRemoveTypeHeaders(true)
- deserializer.addTrustedPackages("*")
-
- val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java,
- ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
-
- return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
- ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
- }
-
- /**
- * Creation of a Kafka MessageListener Container
- *
- * @return KafkaListener instance.
- */
- @Bean
- open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
- val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
- factory.consumerFactory = consumerFactory()
- return factory
- }
-} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
deleted file mode 100644
index e848ed384..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import kotlinx.coroutines.async
-import kotlinx.coroutines.runBlocking
-import org.apache.commons.lang3.builder.ToStringBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.slf4j.LoggerFactory
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
-import org.springframework.kafka.annotation.KafkaListener
-import org.springframework.stereotype.Service
-
-//TODO("Implement with property service and remove spring bindings")
-@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
-@Service
-open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
- private val executionServiceHandler: ExecutionServiceHandler) {
-
- private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
-
- companion object {
- // TODO PREFIX should be retrieved from model or from request.
- const val PREFIX = "self-service-api"
- const val EXECUTION_STATUS = 200
- }
-
- @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
- open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
-
- runBlocking {
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
-
- // Process the message.
- async {
- processMessage(record.value())
- }.await()
- }
- }
-
- private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
-
- val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-
- if (executionServiceOutput.status.code == EXECUTION_STATUS) {
- val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
-
- val payload = executionServiceOutput.payload
-
- log.info("The payload to publish is {}", payload)
-
- blueprintMessageProducerService.sendMessage(payload)
- } else {
- log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
- }
- }
-}