From 0042e9e09d4d43422459f9b1574e61d6c625099d Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 20 May 2019 15:27:55 -0400 Subject: Add Kafka client services Change-Id: I76cf52f6df10e114539c4d65620f431e0f747644 Issue-ID: CCSDK-1349 Signed-off-by: Brinda Santh --- .../message/BluePrintMessageLibConfiguration.kt | 35 ++++++++ .../message/BluePrintMessageLibData.kt | 27 +++++++ .../service/BluePrintMessageLibPropertyService.kt | 84 +++++++++++++++++++ .../service/BlueprintMessageProducerService.kt | 34 ++++++++ .../KafkaBasicAuthMessageProducerService.kt | 94 ++++++++++++++++++++++ 5 files changed, 274 insertions(+) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt new file mode 100644 index 000000000..644c51860 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + + +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +@EnableConfigurationProperties +open class BluePrintMessageLibConfiguration + +class MessageLibConstants { + companion object { + const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" + const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient." + const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt new file mode 100644 index 000000000..e621ec66f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -0,0 +1,27 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + + +open class MessageProducerProperties + + +open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { + lateinit var bootstrapServers: String + var topic: String? = null + var clientId: String? = null +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt new file mode 100644 index 000000000..fb01ce179 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.springframework.stereotype.Service + +@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) +open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) { + + fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService { + val messageClientProperties = messageClientProperties(jsonNode) + return blueprintMessageClientService(messageClientProperties) + } + + fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector" + val messageClientProperties = messageClientProperties(prefix) + return blueprintMessageClientService(messageClientProperties) + } + + fun messageClientProperties(prefix: String): MessageProducerProperties { + val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java) + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + kafkaBasicAuthMessageClientProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties { + val type = jsonNode.get("type").textValue() + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties) + : BlueprintMessageProducerService { + + when (MessageProducerProperties) { + is KafkaBasicAuthMessageProducerProperties -> { + return KafkaBasicAuthMessageProducerService(MessageProducerProperties) + } + else -> { + throw BluePrintProcessorException("couldn't get Message client service for") + } + } + } + + private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { + return bluePrintProperties.propertyBeanType( + prefix, KafkaBasicAuthMessageProducerProperties::class.java) + } + +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt new file mode 100644 index 000000000..e33d41c09 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -0,0 +1,34 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.runBlocking + +interface BlueprintMessageProducerService { + + fun sendMessage(message: Any): Boolean = runBlocking { + sendMessageNB(message) + } + + fun sendMessage(topic: String, message: Any): Boolean = runBlocking { + sendMessageNB(topic, message) + } + + suspend fun sendMessageNB(message: Any): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt new file mode 100644 index 000000000..52ac346db --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -0,0 +1,94 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.slf4j.LoggerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.SendResult +import org.springframework.util.concurrent.ListenableFutureCallback + + +class KafkaBasicAuthMessageProducerService( + private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) + : BlueprintMessageProducerService { + + private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! + + private var kafkaTemplate: KafkaTemplate? = null + + override suspend fun sendMessageNB(message: Any): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessage(messageProducerProperties.topic!!, message) + } + + override suspend fun sendMessageNB(topic: String, message: Any): Boolean { + val serializedMessage = when (message) { + is String -> { + message + } + else -> { + message.asJsonType().toString() + } + } + val future = messageTemplate().send(topic, serializedMessage) + + future.addCallback(object : ListenableFutureCallback> { + override fun onSuccess(result: SendResult) { + log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") + } + + override fun onFailure(ex: Throwable) { + log.error("Unable to send message", ex) + } + }) + return true + } + + + private fun producerFactory(additionalConfig: Map? = null): ProducerFactory { + log.info("Client Properties : $messageProducerProperties") + val configProps = hashMapOf() + configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers + configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + if (messageProducerProperties.clientId != null) { + configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! + } + // TODO("Security Implementation based on type") + + // Add additional Properties + if (additionalConfig != null) { + configProps.putAll(additionalConfig) + } + return DefaultKafkaProducerFactory(configProps) + } + + fun messageTemplate(additionalConfig: Map? = null): KafkaTemplate { + log.info("Prepering templates") + if (kafkaTemplate == null) { + kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + } + return kafkaTemplate!! + } +} + -- cgit 1.2.3-korg