From 45263f50896a7021cd17d78ce83b29365cb19c29 Mon Sep 17 00:00:00 2001 From: Jozsef Csongvai Date: Mon, 26 Jul 2021 12:00:59 -0400 Subject: Revert "Renaming Files having BluePrint to have Blueprint" The renaming in CCSDK-3098 caused breaking changes to the grpc api and compile issues for kotlin scripts. Issue-ID: CCSDK-3385 Change-Id: I0d745cb858371678eabcb2284671c1fd76a1ab6d Signed-off-by: Jozsef Csongvai --- .../message/BluePrintMessageExtensions.kt | 31 +++ .../message/BluePrintMessageLibConfiguration.kt | 71 ++++++ .../message/BluePrintMessageLibData.kt | 264 +++++++++++++++++++++ .../message/BlueprintMessageExtensions.kt | 31 --- .../message/BlueprintMessageLibConfiguration.kt | 71 ------ .../message/BlueprintMessageLibData.kt | 264 --------------------- .../message/MessagePropertiesDSL.kt | 94 ++++---- .../kafka/AbstractKafkaTopologyComponents.kt | 6 +- .../message/kafka/KafkaJDBCStores.kt | 8 +- .../service/BluePrintMessageLibPropertyService.kt | 228 ++++++++++++++++++ .../service/BlueprintMessageConsumerService.kt | 6 +- .../service/BlueprintMessageLibPropertyService.kt | 228 ------------------ .../message/service/KafkaStreamsConsumerService.kt | 6 +- .../message/service/MessageLoggerService.kt | 14 +- .../message/utils/BlueprintMessageUtils.kt | 4 +- 15 files changed, 663 insertions(+), 663 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageExtensions.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibConfiguration.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..509689eca --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,31 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + +fun T?.toMap(): MutableMap { + val map: MutableMap = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt new file mode 100644 index 000000000..659295a6b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -0,0 +1,71 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +@EnableConfigurationProperties +open class BluePrintMessageLibConfiguration + +/** + * Exposed Dependency Service by this Message Lib Module + */ +fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService = + instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) + +/** Extension functions for message producer service **/ +fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(selector) +} + +fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(jsonNode) +} + +/** Extension functions for message consumer service **/ +fun BluePrintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(selector) +} + +fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(jsonNode) +} + +class MessageLibConstants { + companion object { + + const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" + const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." + const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." + const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth" + const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth" + const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" + const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" + const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt new file mode 100644 index 000000000..67dba1f19 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -0,0 +1,264 @@ +/* + * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.ScramLoginModule +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.streams.StreamsConfig + +/** Common Properties **/ +abstract class CommonProperties { + + lateinit var type: String + lateinit var topic: String + lateinit var bootstrapServers: String + + open fun getConfig(): HashMap { + val configProps = hashMapOf() + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + return configProps + } +} + +/** Message Producer */ +/** Message Producer Properties **/ +abstract class MessageProducerProperties : CommonProperties() + +/** Basic Auth */ +open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { + + var clientId: String? = null + var acks: String = "all" // strongest producing guarantee + var maxBlockMs: Int = 250 // max blocking time in ms to send a message + var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) + var enableIdempotence: Boolean = true // ensure we don't push duplicates + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs + configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs + configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence + if (clientId != null) { + configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! + } + return configProps + } +} + +/** SSL Auth */ +open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { + + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { + + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Consumer */ +abstract class MessageConsumerProperties : CommonProperties() +/** Kafka Streams */ +/** Streams properties */ + +/** Basic Auth */ +open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { + + lateinit var applicationId: String + var autoOffsetReset: String = "latest" + var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee + return configProperties + } +} + +/** SSL Auth */ +open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { + + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { + + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} + +/** Message Consumer */ +/** Message Consumer Properties **/ +/** Basic Auth */ +open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { + + lateinit var groupId: String + lateinit var clientId: String + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" + var pollMillSec: Long = 1000 + var pollRecords: Int = -1 + + override fun getConfig(): HashMap { + val configProperties = super.getConfig() + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId + + /** To handle Back pressure, Get only configured record for processing */ + if (pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords + } + + return configProperties + } +} + +/** SSL Auth */ +open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { + + lateinit var truststore: String + lateinit var truststorePassword: String + var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + var keystore: String? = null + var keystorePassword: String? = null + var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() + configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType + configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! + configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! + if (keystore != null) { + configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! + configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType + configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! + } + configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm + return configProps + } +} + +/** (SASL) SCRAM SSL Auth */ +class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { + + var saslMechanism: String = "SCRAM-SHA-512" + lateinit var scramUsername: String + lateinit var scramPassword: String + + override fun getConfig(): HashMap { + val configProps = super.getConfig() + configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() + configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism + configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + + "username=\"${scramUsername}\" " + + "password=\"${scramPassword}\";" + return configProps + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageExtensions.kt deleted file mode 100644 index 509689eca..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageExtensions.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message - -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.header.internals.RecordHeader -import java.nio.charset.Charset - -fun T?.toMap(): MutableMap { - val map: MutableMap = hashMapOf() - this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } - return map -} - -fun Headers.addHeader(key: String, value: String) { - this.add(RecordHeader(key, value.toByteArray())) -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibConfiguration.kt deleted file mode 100644 index 88931d13f..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibConfiguration.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message - -import com.fasterxml.jackson.databind.JsonNode -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService -import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService -import org.springframework.boot.context.properties.EnableConfigurationProperties -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration - -@Configuration -@ComponentScan -@EnableConfigurationProperties -open class BlueprintMessageLibConfiguration - -/** - * Exposed Dependency Service by this Message Lib Module - */ -fun BlueprintDependencyService.messageLibPropertyService(): BlueprintMessageLibPropertyService = - instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) - -/** Extension functions for message producer service **/ -fun BlueprintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService { - return messageLibPropertyService().blueprintMessageProducerService(selector) -} - -fun BlueprintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { - return messageLibPropertyService().blueprintMessageProducerService(jsonNode) -} - -/** Extension functions for message consumer service **/ -fun BlueprintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService { - return messageLibPropertyService().blueprintMessageConsumerService(selector) -} - -fun BlueprintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { - return messageLibPropertyService().blueprintMessageConsumerService(jsonNode) -} - -class MessageLibConstants { - companion object { - - const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" - const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." - const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." - const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" - const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth" - const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth" - const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" - const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth" - const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth" - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt deleted file mode 100644 index 67dba1f19..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageLibData.kt +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message - -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.security.scram.ScramLoginModule -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer -import org.apache.kafka.streams.StreamsConfig - -/** Common Properties **/ -abstract class CommonProperties { - - lateinit var type: String - lateinit var topic: String - lateinit var bootstrapServers: String - - open fun getConfig(): HashMap { - val configProps = hashMapOf() - configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - return configProps - } -} - -/** Message Producer */ -/** Message Producer Properties **/ -abstract class MessageProducerProperties : CommonProperties() - -/** Basic Auth */ -open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { - - var clientId: String? = null - var acks: String = "all" // strongest producing guarantee - var maxBlockMs: Int = 250 // max blocking time in ms to send a message - var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) - var enableIdempotence: Boolean = true // ensure we don't push duplicates - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java - configProps[ProducerConfig.ACKS_CONFIG] = acks - configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs - configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs - configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence - if (clientId != null) { - configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! - } - return configProps - } -} - -/** SSL Auth */ -open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() { - - lateinit var truststore: String - lateinit var truststorePassword: String - var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE - var keystore: String? = null - var keystorePassword: String? = null - var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() - configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! - if (keystore != null) { - configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! - configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType - configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! - } - configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm - - return configProps - } -} - -/** (SASL) SCRAM SSL Auth */ -class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() { - - var saslMechanism: String = "SCRAM-SHA-512" - lateinit var scramUsername: String - lateinit var scramPassword: String - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() - configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism - configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" - return configProps - } -} - -/** Consumer */ -abstract class MessageConsumerProperties : CommonProperties() -/** Kafka Streams */ -/** Streams properties */ - -/** Basic Auth */ -open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() { - - lateinit var applicationId: String - var autoOffsetReset: String = "latest" - var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE - - override fun getConfig(): HashMap { - val configProperties = super.getConfig() - configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset - configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee - return configProperties - } -} - -/** SSL Auth */ -open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() { - - lateinit var truststore: String - lateinit var truststorePassword: String - var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE - var keystore: String? = null - var keystorePassword: String? = null - var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() - configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! - if (keystore != null) { - configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! - configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType - configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! - } - configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm - return configProps - } -} - -/** (SASL) SCRAM SSL Auth */ -class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() { - - var saslMechanism: String = "SCRAM-SHA-512" - lateinit var scramUsername: String - lateinit var scramPassword: String - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() - configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism - configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" - return configProps - } -} - -/** Message Consumer */ -/** Message Consumer Properties **/ -/** Basic Auth */ -open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() { - - lateinit var groupId: String - lateinit var clientId: String - var autoCommit: Boolean = true - var autoOffsetReset: String = "latest" - var pollMillSec: Long = 1000 - var pollRecords: Int = -1 - - override fun getConfig(): HashMap { - val configProperties = super.getConfig() - configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit - /** - * earliest: automatically reset the offset to the earliest offset - * latest: automatically reset the offset to the latest offset - */ - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId - - /** To handle Back pressure, Get only configured record for processing */ - if (pollRecords > 0) { - configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords - } - - return configProperties - } -} - -/** SSL Auth */ -open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() { - - lateinit var truststore: String - lateinit var truststorePassword: String - var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE - var keystore: String? = null - var keystorePassword: String? = null - var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString() - configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType - configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!! - configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!! - if (keystore != null) { - configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!! - configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType - configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!! - } - configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm - return configProps - } -} - -/** (SASL) SCRAM SSL Auth */ -class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() { - - var saslMechanism: String = "SCRAM-SHA-512" - lateinit var scramUsername: String - lateinit var scramPassword: String - - override fun getConfig(): HashMap { - val configProps = super.getConfig() - configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString() - configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism - configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " + - "username=\"${scramUsername}\" " + - "password=\"${scramPassword}\";" - return configProps - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index 527374f02..8b31de9b9 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -17,8 +17,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message import com.fasterxml.jackson.databind.JsonNode -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintTypes +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType import org.onap.ccsdk.cds.controllerblueprints.core.data.RelationshipType @@ -30,48 +30,48 @@ import org.onap.ccsdk.cds.controllerblueprints.core.dsl.relationshipType /** Relationships Types DSL for Message Producer */ fun ServiceTemplateBuilder.relationshipTypeConnectsToMessageProducer() { - val relationshipType = BlueprintTypes.relationshipTypeConnectsToMessageProducer() + val relationshipType = BluePrintTypes.relationshipTypeConnectsToMessageProducer() if (this.relationshipTypes == null) this.relationshipTypes = hashMapOf() this.relationshipTypes!![relationshipType.id!!] = relationshipType } -fun BlueprintTypes.relationshipTypeConnectsToMessageProducer(): RelationshipType { +fun BluePrintTypes.relationshipTypeConnectsToMessageProducer(): RelationshipType { return relationshipType( - id = BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, - version = BlueprintConstants.DEFAULT_VERSION_NUMBER, - derivedFrom = BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, + id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, + version = BluePrintConstants.DEFAULT_VERSION_NUMBER, + derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, description = "Relationship connects to through message producer." ) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintConstants.DATA_TYPE_MAP, + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintConstants.DATA_TYPE_MAP, true, "Connection Config details." ) - validTargetTypes(arrayListOf(BlueprintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) + validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) } } fun ServiceTemplateBuilder.relationshipTypeConnectsToMessageConsumer() { - val relationshipType = BlueprintTypes.relationshipTypeConnectsToMessageConsumer() + val relationshipType = BluePrintTypes.relationshipTypeConnectsToMessageConsumer() if (this.relationshipTypes == null) this.relationshipTypes = hashMapOf() this.relationshipTypes!![relationshipType.id!!] = relationshipType } -fun BlueprintTypes.relationshipTypeConnectsToMessageConsumer(): RelationshipType { +fun BluePrintTypes.relationshipTypeConnectsToMessageConsumer(): RelationshipType { return relationshipType( - id = BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, - version = BlueprintConstants.DEFAULT_VERSION_NUMBER, - derivedFrom = BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, + id = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, + version = BluePrintConstants.DEFAULT_VERSION_NUMBER, + derivedFrom = BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO, description = "Relationship type connects to message consumer." ) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintConstants.DATA_TYPE_MAP, + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintConstants.DATA_TYPE_MAP, true, "Connection Config details." ) - validTargetTypes(arrayListOf(BlueprintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) + validTargetTypes(arrayListOf(BluePrintConstants.MODEL_TYPE_CAPABILITY_TYPE_ENDPOINT)) } } @@ -90,46 +90,46 @@ fun TopologyTemplateBuilder.relationshipTemplateMessageProducer( class MessageProducerRelationshipTemplateBuilder(name: String, description: String) : RelationshipTemplateBuilder( name, - BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, description + BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER, description ) { fun kafkaBasicAuth(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaBasicAuthMessageProducerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block) ) } fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaSslAuthMessageProducerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageProducerProperties(block) ) } fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaScramSslAuthMessageProducerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block) ) } } -fun BlueprintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaBasicAuthMessageProducerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaSslAuthMessageProducerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaScramSslAuthMessageProducerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() @@ -245,88 +245,88 @@ fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer( class MessageConsumerRelationshipTemplateBuilder(name: String, description: String) : RelationshipTemplateBuilder( name, - BlueprintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, description + BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_CONSUMER, description ) { fun kafkaBasicAuth(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaBasicAuthMessageConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block) ) } fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaSslAuthMessageConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block) ) } fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaScramSslAuthMessageConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block) ) } fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaStreamsBasicAuthConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block) ) } fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaStreamsSslAuthConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block) ) } fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) { property( - BlueprintConstants.PROPERTY_CONNECTION_CONFIG, - BlueprintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) + BluePrintConstants.PROPERTY_CONNECTION_CONFIG, + BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block) ) } } -fun BlueprintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaBasicAuthMessageConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_BASIC_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaSslAuthMessageConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsSslAuthConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive() return assignments.asJsonType() } -fun BlueprintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { +fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode { val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build() assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] = MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt index 23eca18a3..72a70893a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -24,9 +24,9 @@ import org.apache.kafka.streams.processor.Punctuator import org.onap.ccsdk.cds.controllerblueprints.core.logger /** CDS Kafka Stream Processor abstract class to implement */ -abstract class AbstractBlueprintMessageProcessor : Processor { +abstract class AbstractBluePrintMessageProcessor : Processor { - private val log = logger(AbstractBlueprintMessageProcessor::class) + private val log = logger(AbstractBluePrintMessageProcessor::class) lateinit var processorContext: ProcessorContext @@ -51,7 +51,7 @@ abstract class AbstractBlueprintMessageProcessor : Processor { } /** CDS Kafka Stream Punctuator abstract class to implement */ -abstract class AbstractBlueprintMessagePunctuator : Punctuator { +abstract class AbstractBluePrintMessagePunctuator : Punctuator { lateinit var processorContext: ProcessorContext diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt index ecf5870ab..86ccd74a2 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt @@ -21,10 +21,10 @@ import org.apache.kafka.streams.processor.ProcessorContext import org.apache.kafka.streams.processor.StateStore import org.apache.kafka.streams.state.StoreBuilder import org.apache.kafka.streams.state.StoreSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.db.BlueprintDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import java.util.* @@ -32,7 +32,7 @@ class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageProducerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageProducerProperties::class.java + ) + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties { + val type = jsonNode.get("type").textValue() + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + /** Consumer Property Lib Service Implementation **/ + + /** Return Message Consumer Service for [jsonNode] definitions. */ + fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + val messageConsumerProperties = messageConsumerProperties(jsonNode) + return blueprintMessageConsumerService(messageConsumerProperties) + } + + /** Return Message Consumer Service for [selector] definitions. */ + fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector" + val messageClientProperties = messageConsumerProperties(prefix) + return blueprintMessageConsumerService(messageClientProperties) + } + + /** Return Message Consumer Properties for [prefix] definitions. */ + fun messageConsumerProperties(prefix: String): MessageConsumerProperties { + val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) + return when (type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaSslAuthMessageConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaScramSslAuthMessageConsumerProperties::class.java + ) + } + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsSslAuthConsumerProperties::class.java + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + bluePrintPropertiesService.propertyBeanType( + prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java + ) + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { + val type = jsonNode.get("type").textValue() + return when (type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! + } + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!! + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): + BlueprintMessageConsumerService { + + when (messageConsumerProperties.type) { + /** Message Consumer */ + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties, + meterRegistry + ) + } + MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaSslAuthMessageConsumerProperties, + meterRegistry + ) + } + MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { + return KafkaMessageConsumerService( + messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties, + meterRegistry + ) + } + /** Stream Consumer */ + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties + ) + } + MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { + return KafkaStreamsConsumerService( + messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties + ) + } + else -> { + throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 887a7a7b8..b39d89bfd 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException /** Consumer Function Interfaces */ interface ConsumerFunction @@ -47,7 +47,7 @@ interface BlueprintMessageConsumerService { /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { - throw BlueprintProcessorException("Not Implemented") + throw BluePrintProcessorException("Not Implemented") } /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ @@ -56,7 +56,7 @@ interface BlueprintMessageConsumerService { additionalConfig: Map?, consumerFunction: ConsumerFunction ) { - throw BlueprintProcessorException("Not Implemented") + throw BluePrintProcessorException("Not Implemented") } /** close the channel, consumer and other resources */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt deleted file mode 100644 index cb0bc3225..000000000 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.message.service - -import com.fasterxml.jackson.databind.JsonNode -import io.micrometer.core.instrument.MeterRegistry -import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import org.springframework.stereotype.Service - -@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) -open class BlueprintMessageLibPropertyService( - private var bluePrintPropertiesService: BlueprintPropertiesService, - private val meterRegistry: MeterRegistry -) { - - fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { - val messageClientProperties = messageProducerProperties(jsonNode) - return KafkaMessageProducerService(messageClientProperties, meterRegistry) - } - - fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { - val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" - val messageClientProperties = messageProducerProperties(prefix) - return KafkaMessageProducerService(messageClientProperties, meterRegistry) - } - - fun messageProducerProperties(prefix: String): MessageProducerProperties { - val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) - return when (type) { - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageProducerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaSslAuthMessageProducerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaScramSslAuthMessageProducerProperties::class.java - ) - } - else -> { - throw BlueprintProcessorException("Message adaptor($type) is not supported") - } - } - } - - fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties { - val type = jsonNode.get("type").textValue() - return when (type) { - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!! - } - else -> { - throw BlueprintProcessorException("Message adaptor($type) is not supported") - } - } - } - - /** Consumer Property Lib Service Implementation **/ - - /** Return Message Consumer Service for [jsonNode] definitions. */ - fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { - val messageConsumerProperties = messageConsumerProperties(jsonNode) - return blueprintMessageConsumerService(messageConsumerProperties) - } - - /** Return Message Consumer Service for [selector] definitions. */ - fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService { - val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector" - val messageClientProperties = messageConsumerProperties(prefix) - return blueprintMessageConsumerService(messageClientProperties) - } - - /** Return Message Consumer Properties for [prefix] definitions. */ - fun messageConsumerProperties(prefix: String): MessageConsumerProperties { - val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) - return when (type) { - /** Message Consumer */ - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaSslAuthMessageConsumerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaScramSslAuthMessageConsumerProperties::class.java - ) - } - /** Stream Consumer */ - MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsSslAuthConsumerProperties::class.java - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { - bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java - ) - } - else -> { - throw BlueprintProcessorException("Message adaptor($type) is not supported") - } - } - } - - fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { - val type = jsonNode.get("type").textValue() - return when (type) { - /** Message Consumer */ - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!! - } - /** Stream Consumer */ - MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!! - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { - JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!! - } - else -> { - throw BlueprintProcessorException("Message adaptor($type) is not supported") - } - } - } - - private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): - BlueprintMessageConsumerService { - - when (messageConsumerProperties.type) { - /** Message Consumer */ - MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - return KafkaMessageConsumerService( - messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties, - meterRegistry - ) - } - MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> { - return KafkaMessageConsumerService( - messageConsumerProperties as KafkaSslAuthMessageConsumerProperties, - meterRegistry - ) - } - MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> { - return KafkaMessageConsumerService( - messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties, - meterRegistry - ) - } - /** Stream Consumer */ - MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { - return KafkaStreamsConsumerService( - messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> { - return KafkaStreamsConsumerService( - messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties - ) - } - MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> { - return KafkaStreamsConsumerService( - messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties - ) - } - else -> { - throw BlueprintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}") - } - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt index beaa995df..4340e4815 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -20,7 +20,7 @@ import kotlinx.coroutines.channels.Channel import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.streams.KafkaStreams import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.util.Properties @@ -41,11 +41,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me } override suspend fun subscribe(additionalConfig: Map?): Channel> { - throw BlueprintProcessorException("not implemented") + throw BluePrintProcessorException("not implemented") } override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel> { - throw BlueprintProcessorException("not implemented") + throw BluePrintProcessorException("not implemented") } override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt index 7e5f35d15..90b850017 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -21,7 +21,7 @@ import org.apache.kafka.common.header.Headers import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -46,9 +46,9 @@ class MessageLoggerService { fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { val headers = consumerRecord.headers().toMap() - val requestID = headers[BlueprintConstants.ONAP_REQUEST_ID].defaultToUUID() - val invocationID = headers[BlueprintConstants.ONAP_INVOCATION_ID].defaultToUUID() - val partnerName = headers[BlueprintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" messageConsuming(requestID, invocationID, partnerName, consumerRecord) } @@ -83,9 +83,9 @@ class MessageLoggerService { */ fun messageProducing(requestHeader: Headers) { val localhost = InetAddress.getLocalHost() - requestHeader.addHeader(BlueprintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) - requestHeader.addHeader(BlueprintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) - requestHeader.addHeader(BlueprintConstants.ONAP_PARTNER_NAME, BlueprintConstants.APP_NAME) + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, BluePrintConstants.APP_NAME) requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt index 7431998d9..795d386f3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt @@ -17,13 +17,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.utils import io.micrometer.core.instrument.Tag -import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants class BlueprintMessageUtils { companion object { fun kafkaMetricTag(topic: String): MutableList = mutableListOf( - Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic) + Tag.of(BluePrintConstants.METRIC_TAG_TOPIC, topic) ) } } -- cgit 1.2.3-korg