diff options
author | prathameshmo <prathamesh.morde@bell.ca> | 2019-06-11 17:47:42 -0400 |
---|---|---|
committer | prathameshmo <prathamesh.morde@bell.ca> | 2019-06-25 17:03:44 -0400 |
commit | b134a815fbb3404e46551f8f2361e6cca6c7728d (patch) | |
tree | 1c2df5a0b010146dc5310b2e64f884fda1eea833 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main | |
parent | a6fae85764a8dfbeba6000a060b8be0f21fb0466 (diff) |
Kafka Messaging Controller API.
Things done-
Addressed review comments.
Logic to consume events and process it.
Added integration testing.
Change-Id: If574a363f9fb8581018cc5a7ba106251a9d8caf1
Issue-ID:CCSDK-1356
Signed-off-by: prathamesh morde <prathamesh.morde@bell.ca>
Signed-off-by: prathameshmo <prathamesh.morde@bell.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main')
2 files changed, 121 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt new file mode 100644 index 000000000..a04a79921 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt @@ -0,0 +1,47 @@ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.JsonDeserializer + +@Configuration +open class MessagingConfig { + + @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") + lateinit var groupId: String + + @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") + lateinit var bootstrapServers: String + + open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? { + val configProperties = hashMapOf<String, Any>() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name + configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java)) + } + + /** + * Creation of a Kafka MessageListener Container + * + * @return KafkaListener instance. + */ + @Bean + open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { + val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() + factory.consumerFactory = consumerFactory() + return factory + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt new file mode 100644 index 000000000..1d219a83e --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt @@ -0,0 +1,74 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import kotlinx.coroutines.async +import kotlinx.coroutines.runBlocking +import org.apache.commons.lang3.builder.ToStringBuilder +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.stereotype.Service + +@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true") +@Service +open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler) { + + private val log = LoggerFactory.getLogger(MessagingController::class.java)!! + + companion object { + // TODO PREFIX should be retrieved from model or from request. + const val PREFIX = "self-service-api" + const val EXECUTION_STATUS = 200 + } + + @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) + open fun receive(input: ExecutionServiceInput) { + + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + + runBlocking { + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + + // Process the message. + async { + processMessage(input) + } + } + } + + private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) { + + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + + if (executionServiceOutput.status.code == EXECUTION_STATUS) { + val bluePrintMessageClientService = propertyService + .blueprintMessageClientService(PREFIX) + + val payload = executionServiceOutput.payload + + log.info("The payload to publish is {}", payload) + + bluePrintMessageClientService.sendMessage(payload) + } + else { + log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage) + } + } +} |