diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-05 11:06:48 -0400 |
---|---|---|
committer | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-05 11:06:48 -0400 |
commit | a2d60124bd44c767c42f4aa4ac8a7edafbfa3e39 (patch) | |
tree | 45cd9390e65bf1b3851f1e00f7a4cdf21054b93e /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org | |
parent | 8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff) |
Add Config based blueprint process consumer
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497
Issue-ID: CCSDK-1668
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org')
3 files changed, 108 insertions, 130 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt new file mode 100644 index 000000000..b339903c5 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.event.ApplicationReadyEvent +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import javax.annotation.PreDestroy + +@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"], + havingValue = "true") +@Service +open class BluePrintProcessingKafkaConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val executionServiceHandler: ExecutionServiceHandler) { + + val log = logger(BluePrintProcessingKafkaConsumer::class) + + private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService + + companion object { + const val CONSUMER_SELECTOR = "self-service-api" + const val PRODUCER_SELECTOR = "self-service-api" + } + + @EventListener(ApplicationReadyEvent::class) + fun setupMessageListener() = runBlocking { + try { + log.info("Setting up message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR)...") + + /** Get the Message Consumer Service **/ + blueprintMessageConsumerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(CONSUMER_SELECTOR) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create consumer service ${e.message}") + } + + /** Get the Message Producer Service **/ + val blueprintMessageProducerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(PRODUCER_SELECTOR) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + + launch { + /** Subscribe to the consumer topics */ + val additionalConfig: MutableMap<String, Any> = hashMapOf() + val channel = blueprintMessageConsumerService.subscribe(additionalConfig) + channel.consumeEach { message -> + launch { + try { + log.trace("Consumed Message : $message") + val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + //TODO("In future, Message publisher configuration vary with respect to request") + /** Send the response message */ + blueprintMessageProducerService.sendMessage(executionServiceOutput) + } catch (e: Exception) { + log.error("failed in processing the consumed message : $message", e) + } + } + } + } + } catch (e: Exception) { + log.error("failed to start message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR) ", e) + } + } + + @PreDestroy + fun shutdownMessageListener() = runBlocking { + try { + log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " + + "message producer($PRODUCER_SELECTOR)...") + blueprintMessageConsumerService.shutDown() + } catch (e: Exception) { + log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e) + } + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt deleted file mode 100644 index 17e157d15..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt +++ /dev/null @@ -1,58 +0,0 @@ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api - -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.ConsumerFactory -import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 -import org.springframework.kafka.support.serializer.JsonDeserializer - -@Configuration -open class MessagingConfig { - - @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}") - lateinit var groupId: String - - @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}") - lateinit var bootstrapServers: String - - open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? { - val configProperties = hashMapOf<String, Any>() - configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java - configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name - - val deserializer = JsonDeserializer<ExecutionServiceInput>() - deserializer.setRemoveTypeHeaders(true) - deserializer.addTrustedPackages("*") - - val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java, - ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)) - - return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), - ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer)) - } - - /** - * Creation of a Kafka MessageListener Container - * - * @return KafkaListener instance. - */ - @Bean - open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> { - val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>() - factory.consumerFactory = consumerFactory() - return factory - } -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt deleted file mode 100644 index e848ed384..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright © 2019 Bell Canada - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api - -import kotlinx.coroutines.async -import kotlinx.coroutines.runBlocking -import org.apache.commons.lang3.builder.ToStringBuilder -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.slf4j.LoggerFactory -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty -import org.springframework.kafka.annotation.KafkaListener -import org.springframework.stereotype.Service - -//TODO("Implement with property service and remove spring bindings") -@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true") -@Service -open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService, - private val executionServiceHandler: ExecutionServiceHandler) { - - private val log = LoggerFactory.getLogger(MessagingController::class.java)!! - - companion object { - // TODO PREFIX should be retrieved from model or from request. - const val PREFIX = "self-service-api" - const val EXECUTION_STATUS = 200 - } - - @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) - open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) { - - runBlocking { - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value())) - - // Process the message. - async { - processMessage(record.value()) - }.await() - } - } - - private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) { - - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - - if (executionServiceOutput.status.code == EXECUTION_STATUS) { - val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX) - - val payload = executionServiceOutput.payload - - log.info("The payload to publish is {}", payload) - - blueprintMessageProducerService.sendMessage(payload) - } else { - log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage) - } - } -} |