summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
diff options
context:
space:
mode:
authorprathameshmo <prathamesh.morde@bell.ca>2019-06-11 17:47:42 -0400
committerprathameshmo <prathamesh.morde@bell.ca>2019-06-25 17:03:44 -0400
commitb134a815fbb3404e46551f8f2361e6cca6c7728d (patch)
tree1c2df5a0b010146dc5310b2e64f884fda1eea833 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
parenta6fae85764a8dfbeba6000a060b8be0f21fb0466 (diff)
Kafka Messaging Controller API.
Things done- Addressed review comments. Logic to consume events and process it. Added integration testing. Change-Id: If574a363f9fb8581018cc5a7ba106251a9d8caf1 Issue-ID:CCSDK-1356 Signed-off-by: prathamesh morde <prathamesh.morde@bell.ca> Signed-off-by: prathameshmo <prathamesh.morde@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt47
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt74
2 files changed, 121 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
new file mode 100644
index 000000000..a04a79921
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
@@ -0,0 +1,47 @@
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.JsonDeserializer
+
+@Configuration
+open class MessagingConfig {
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
+ lateinit var groupId: String
+
+ @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+ lateinit var bootstrapServers: String
+
+ open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
+ val configProperties = hashMapOf<String, Any>()
+ configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
+ configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+ return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+ }
+
+ /**
+ * Creation of a Kafka MessageListener Container
+ *
+ * @return KafkaListener instance.
+ */
+ @Bean
+ open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
+ val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
+ factory.consumerFactory = consumerFactory()
+ return factory
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
new file mode 100644
index 000000000..1d219a83e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.async
+import kotlinx.coroutines.runBlocking
+import org.apache.commons.lang3.builder.ToStringBuilder
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.stereotype.Service
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
+@Service
+open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
+ private val executionServiceHandler: ExecutionServiceHandler) {
+
+ private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
+
+ companion object {
+ // TODO PREFIX should be retrieved from model or from request.
+ const val PREFIX = "self-service-api"
+ const val EXECUTION_STATUS = 200
+ }
+
+ @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
+ open fun receive(input: ExecutionServiceInput) {
+
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+ runBlocking {
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+ // Process the message.
+ async {
+ processMessage(input)
+ }
+ }
+ }
+
+ private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
+
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+
+ if (executionServiceOutput.status.code == EXECUTION_STATUS) {
+ val bluePrintMessageClientService = propertyService
+ .blueprintMessageClientService(PREFIX)
+
+ val payload = executionServiceOutput.payload
+
+ log.info("The payload to publish is {}", payload)
+
+ bluePrintMessageClientService.sendMessage(payload)
+ }
+ else {
+ log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
+ }
+ }
+}