From d6946b8a803896ea9797db961e7b97d2f6470e8e Mon Sep 17 00:00:00 2001 From: janani b Date: Mon, 4 Feb 2019 12:05:22 +0530 Subject: Dmaap Publisher base code Dmaap publisher code for any component to create a session with the message-router Issue-ID: CCSDK-693 Change-Id: Ia60079e2b013ea623f776ff7db9dea4bed4636df Signed-off-by: janani b --- .../dmaap/DmaapEventPublisher.kt | 181 +++++++++++++++++++++ .../dmaap/EnvironmentContext.kt | 56 +++++++ .../blueprintsprocessor/dmaap/EventPublisher.kt | 39 +++++ 3 files changed, 276 insertions(+) create mode 100644 ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/DmaapEventPublisher.kt create mode 100644 ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EnvironmentContext.kt create mode 100644 ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EventPublisher.kt (limited to 'ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/DmaapEventPublisher.kt b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/DmaapEventPublisher.kt new file mode 100644 index 000000000..7c686f089 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/DmaapEventPublisher.kt @@ -0,0 +1,181 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CDS + * ================================================================================ + * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.apps.blueprintsprocessor.dmaap + +import com.att.nsa.mr.client.MRBatchingPublisher +import com.att.nsa.mr.client.MRClientFactory +import com.att.nsa.mr.client.MRPublisher +import org.slf4j.LoggerFactory +import org.springframework.boot.context.properties.bind.Binder +import org.springframework.boot.context.properties.source.ConfigurationPropertySources +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.PropertySource +import org.springframework.context.annotation.PropertySources +import org.springframework.core.env.ConfigurableEnvironment +import org.springframework.core.env.Environment +import org.springframework.core.io.support.ResourcePropertySource +import java.io.IOException +import java.util.Properties +import java.util.concurrent.TimeUnit + +/** + * Representation of DMaap event publisher, to create a session with the + * message router and send messages when asked for. The producer.properties + * is used for creating a session. In order to overwrite the parameters such + * as host, topic, username and password, the event.properties can be used. + * + * compName : Name of the component appended in the event.properties file + * to overwrite. + * (E.g., so.topic=cds_so : In this "so" is the component name) + */ +@Configuration +@PropertySources(PropertySource("classpath:event.properties", + "classpath:producer.properties")) +open class DmaapEventPublisher(compName: String = ""): EventPublisher { + + /** + * Static variable for logging. + */ + companion object { + var log = LoggerFactory.getLogger(DmaapEventPublisher::class.java)!! + } + + /** + * The component name used in defining the event.properties file. + */ + private var cName:String? = null + + /** + * List of topics for a given message to be sent. + */ + var topics = mutableListOf() + + /** + * List of clients formed for the list of topics where the messages has to + * be sent. + */ + var clients = mutableListOf() + + /** + * The populated values from producer.properties which are overwritten + * by the event.properties values according to the component information. + */ + var prodProps: Properties = Properties() + + + init { + cName = compName + } + + /** + * Loads the producer.properties file and populates all the parameters + * and then loads the event.properties file and populates the finalized + * parameters such as host, topic, username and password if available for + * the specified component. With this updated producer.properties, for + * each topic a client will be created. + */ + private fun loadPropertiesInfo() { + if (prodProps.isEmpty) { + parseEventProps(cName!!) + addClients() + } + } + + /** + * Adds clients for each topic into a client list. + */ + private fun addClients() { + for (topic in topics) { + prodProps.setProperty("topic", topic) + val client = MRClientFactory.createBatchingPublisher(prodProps) + clients.add(client) + } + } + + /** + * Parses the event.properties file and update it into the producer + * .properties, where both the files are loaded and stored. + */ + private fun parseEventProps(cName: String) { + val env = EnvironmentContext.env as Environment + val propSrc = ConfigurationPropertySources.get(env) + val proProps = (env as ConfigurableEnvironment).propertySources.get( + "class path resource [producer.properties]") + + if (proProps != null) { + val entries = (proProps as ResourcePropertySource).source.entries + for (e in entries) { + prodProps.put(e.key, e.value) + } + } else { + log.info("Unable to load the producer.properties file") + } + + val eProps = Binder(propSrc).bind(cName, Properties::class.java).get() + val top = eProps.get("topic").toString() + if (top != "") { + topics.addAll(top.split(",")) + } + prodProps.putAll(eProps) + } + + /** + * Sends message to the sessions created by the information provided in + * the producer.properties file. + */ + override fun sendMessage(partition: String , messages: Collection): + Boolean { + loadPropertiesInfo() + var success = true + val dmaapMsgs = mutableListOf() + for (m in messages) { + dmaapMsgs.add(MRPublisher.message(partition, m)) + } + for (client in clients) { + log.info("Sending messages to the DMaap Server") + try { + client.send(dmaapMsgs) + } catch (e: IOException) { + log.error(e.message, e) + success = false + } + } + return success + } + + /** + * Closes the opened session that was used for sending messages. + */ + override fun close(timeout: Long) { + log.debug("Closing the DMaap producer clients") + if (!clients.isEmpty()) { + for (client in clients) { + try { + client.close(timeout, TimeUnit.SECONDS) + } catch (e : IOException) { + log.warn("Unable to cleanly close the connection from " + + "the client $client", e) + } + } + } + } + +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EnvironmentContext.kt b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EnvironmentContext.kt new file mode 100644 index 000000000..1d2a28ce8 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EnvironmentContext.kt @@ -0,0 +1,56 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CDS + * ================================================================================ + * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.apps.blueprintsprocessor.dmaap + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.core.env.Environment +import org.springframework.stereotype.Component +import javax.annotation.PostConstruct + +/** + * Abstraction of environment context information component. + */ +@Component +class EnvironmentContext { + + /** + * Environment information. + */ + companion object { + var env: Environment? = null + } + + /** + * Environment auto-wired information. + */ + @Autowired + var environment: Environment? = null + + /** + * Initiates the static variable after the instantiation takes place to + * the auto-wired variable. + */ + @PostConstruct + private fun initStaticContext() { + env = environment + } + +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EventPublisher.kt b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EventPublisher.kt new file mode 100644 index 000000000..7d02e806c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/dmaap-lib/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/dmaap/EventPublisher.kt @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CDS + * ================================================================================ + * Copyright (C) 2019 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.apps.blueprintsprocessor.dmaap + +/** + * Abstraction of a publisher, to send messages with the given partition in a + * session and closing the same. + */ +interface EventPublisher { + + /** + * Sends messages through a session on a given partition. + */ + fun sendMessage(partition: String, messages: Collection): Boolean + + /** + * Closes the session with the given time. + */ + fun close(timeout: Long) + +} -- cgit 1.2.3-korg