From 8029f8e5332f107267ec11293c3099e54e87c67b Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 16 Jan 2020 11:21:50 -0500 Subject: Prioritization Optional NATS consumer support Add prioritization NATS consumer service and configuration data beans. Optimizing message prioritization service interface. Added Integration testing for NATS simulation. Updated sample docker compose for NATS support Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15 --- .../src/main/dc/docker-compose-cluster.yaml | 19 ++- .../application/src/main/dc/docker-compose.yaml | 21 ++++ .../prioritization/MessagePrioritizationData.kt | 8 ++ .../prioritization/MessagePrioritizationService.kt | 13 +- .../AbstractKafkaMessagePrioritizationService.kt | 84 +++++++++++++ .../kafka/AbstractMessagePrioritizeProcessor.kt | 6 - .../kafka/DefaultMessagePrioritizeProcessor.kt | 13 +- .../kafka/KafkaMessagePrioritizationConsumer.kt | 108 +++++++++++++++++ .../kafka/MessagePrioritizationConsumer.kt | 107 ----------------- .../AbstractNatsMessagePrioritizationService.kt | 85 +++++++++++++ .../nats/NatsMessagePrioritizationConsumer.kt | 91 ++++++++++++++ .../AbstractMessagePrioritizationService.kt | 45 ++++--- .../MessagePrioritizationSchedulerService.kt | 8 +- .../service/SampleMessagePrioritizationService.kt | 82 +++++++++++-- .../utils/MessagePrioritizationSample.kt | 14 ++- .../prioritization/utils/MessageProcessorUtils.kt | 10 +- .../MessagePrioritizationConsumerTest.kt | 133 +++++++++++++++++---- .../message/prioritization/TestConfiguration.kt | 15 --- .../core/service/BluePrintDependencyService.kt | 9 ++ .../core/utils/ClusterUtils.kt | 4 + .../nats/BluePrintNatsLibConfiguration.kt | 1 + .../nats/BluePrintNatsLibData.kt | 3 +- .../service/BluePrintNatsLibPropertyService.kt | 10 +- .../nats/service/TokenAuthNatsService.kt | 8 ++ .../nats/utils/NatsClusterUtils.kt | 45 +++++++ .../service/BluePrintNatsLibPropertyServiceTest.kt | 53 ++++++++ .../nats/service/BluePrintNatsServiceTest.kt | 2 +- 27 files changed, 778 insertions(+), 219 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt (limited to 'ms') diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml index 020038c26..7159534ba 100644 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml @@ -22,13 +22,17 @@ services: image: nats-streaming:latest container_name: nats hostname: nats - command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0" + command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats-0" networks: - cds-network ports: - "8222:8222" - "4222:4222" restart: always + volumes: + - target: /opt/app/onap/nats/store + type: volume + source: nats-store cds-controller-0: depends_on: - db @@ -57,6 +61,7 @@ services: CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0 CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf + NATS_CLUSTER_ID: cds-cluster NATS_HOSTS: nats://nats:4222 APPLICATIONNAME: cds-controller BUNDLEVERSION: 1.0.0 @@ -90,6 +95,7 @@ services: CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0 CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf + NATS_CLUSTER_ID: cds-cluster NATS_HOSTS: nats://nats:4222 APPLICATIONNAME: resource-resolution BUNDLEVERSION: 1.0.0 @@ -116,6 +122,7 @@ services: CLUSTER_ID: cds-cluster CLUSTER_NODE_ID: py-executor-0 CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0 + NATS_CLUSTER_ID: cds-cluster NATS_HOSTS: nats://nats:4222 APPLICATIONNAME: py-executor BUNDLEVERSION: 1.0.0 @@ -132,6 +139,12 @@ volumes: type: none device: /opt/app/cds/mysql/data o: bind + nats-store: + driver: local + driver_opts: + type: none + device: /opt/app/cds/nats/nats-0/store + o: bind blueprints-deploy: driver: local driver_opts: @@ -142,13 +155,13 @@ volumes: driver: local driver_opts: type: none - device: /opt/app/cds/cds-controller/config + device: /opt/app/cds/cds-controller/cds-controller-0/config o: bind resource-resolution-config: driver: local driver_opts: type: none - device: /opt/app/cds/resource-resolution/config + device: /opt/app/cds/resource-resolution/resource-resolution-0/config o: bind networks: diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml index 20b17bc90..8f2a78639 100755 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml @@ -18,6 +18,21 @@ services: MYSQL_DATABASE: sdnctl MYSQL_USER: sdnctl MYSQL_PASSWORD: sdnctl + nats: + image: nats-streaming:latest + container_name: nats + hostname: nats + command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats" + networks: + - cds-network + ports: + - "8222:8222" + - "4222:4222" + restart: always + volumes: + - target: /opt/app/onap/nats/store + type: volume + source: nats-store cds-controller-default: depends_on: - db @@ -89,6 +104,12 @@ volumes: type: none device: /opt/app/cds/mysql/data o: bind + nats-store: + driver: local + driver_opts: + type: none + device: /opt/app/cds/nats/store + o: bind blueprints-deploy: driver: local driver_opts: diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 8345df523..424929b82 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable { lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration } open class KafkaConfiguration : Serializable { @@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable { lateinit var outputTopic: String // Publish Configuration Selector } +open class NatsConfiguration : Serializable { + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + open class ExpiryConfiguration : Serializable { var frequencyMilli: Long = 30000L var maxPollRecord: Int = 1000 diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt index 464f97a88..dfe516953 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -16,21 +16,22 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorContext import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization interface MessagePrioritizationService { - fun setKafkaProcessorContext(processorContext: ProcessorContext?) + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration suspend fun prioritize(messagePrioritization: MessagePrioritization) /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ suspend fun output(messages: List) - /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */ - suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() - /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */ - suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 656646ff7..d4f8470c8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -17,17 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.logger /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun init(processorContext: ProcessorContext) { this.processorContext = processorContext } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index 624a69fd4..1b0612492 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils open class DefaultMessagePrioritizeProcessor( private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) : AbstractMessagePrioritizeProcessor() { private val log = logger(DefaultMessagePrioritizeProcessor::class) @@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor( val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { - messagePrioritizationService.prioritize(messagePrioritize) + kafkaMessagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor( override fun init(context: ProcessorContext) { super.init(context) /** Set Configuration and Processor Context to messagePrioritizationService */ - messagePrioritizationService.setKafkaProcessorContext(processorContext) + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } } override fun close() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..d5ec0233a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(KafkaMessagePrioritizationConsumer::class) + + private lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + kafkaConsumerConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (::streamingConsumerService.isInitialized) { + streamingConsumerService.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt deleted file mode 100644 index fb7cfd110..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList - -open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) { - - private val log = logger(MessagePrioritizationConsumer::class) - - lateinit var streamingConsumerService: BlueprintMessageConsumerService - - open fun consumerService(selector: String): BlueprintMessageConsumerService { - return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) - } - - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - kafkaConsumerConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - // Output will be sent to the group-output topic from Processor API - return topology - } - } - } - - suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) - - // Dynamic Consumer Function to create Topology - val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) - streamingConsumerService.consume(null, consumerFunction) - } - - suspend fun shutDown() { - if (streamingConsumerService != null) { - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..20da2c28c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,91 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 931403200..a6963d83f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -16,14 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils @@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService( private val log = logger(AbstractMessagePrioritizationService::class) - var processorContext: ProcessorContext? = null + lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { - this.processorContext = processorContext + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration } override suspend fun prioritize(messagePrioritize: MessagePrioritization) { try { log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + /** Get the cluster lock for message group */ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) // Save the Message @@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService( override suspend fun output(messages: List) { log.info("$$$$$ received in output processor id(${messages.ids()})") messages.forEach { message -> - val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - /** Check for Kafka Processing, If yes, then send to the output topic */ - if (this.processorContext != null) { - processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) } } - override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) { + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() try { val fetchMessages = messagePrioritizationStateService .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { + if (!expiredIds.isNullOrEmpty()) { messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - if (processorContext != null) { - fetchMessages.forEach { expired -> - expired.state = MessageState.EXPIRED.name - processorContext!!.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } } } catch (e: Exception) { log.error("failed in updating expired messages", e) @@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService( } } - override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) { + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration val clusterLock = MessageProcessorUtils.prioritizationCleanLock() try { messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt index b1c1fb15f..2f08c1c34 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService( } */ - open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + log.info("Starting Prioritization Scheduler Service...") GlobalScope.launch { expiryScheduler(prioritizationConfiguration) @@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.updateExpiredMessages(expiryConfiguration) + messagePrioritizationService.updateExpiredMessages() delay(expiryConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization expiry scheduler", e) @@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.cleanExpiredMessage(cleanConfiguration) + messagePrioritizationService.cleanExpiredMessage() delay(cleanConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization clean scheduler", e) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt index b7d878e4a..305e64ba4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -16,11 +16,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - private val log = logger(DefaultMessagePrioritizeProcessor::class) + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { /** Child overriding this implementation , if necessary */ override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List) { log.info("messages(${messages.ids()}) aggregated") /** Sequence based on Priority and Updated Date */ val sequencedMessage = messages.orderByHighestPriority() /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name) - output(sequencedMessage) + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) } - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { /** Dummy Implementation, This can also be read from file and stored as cached map **/ "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) else -> null } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index e497ef144..2c4ae30da 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Calendar +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate import java.util.Date import java.util.UUID @@ -35,6 +37,12 @@ object MessagePrioritizationSample { outputTopic = "prioritize-output-topic" expiredTopic = "prioritize-expired-topic" } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } expiryConfiguration = ExpiryConfiguration().apply { frequencyMilli = 10000L maxPollRecord = 2000 @@ -66,9 +74,7 @@ object MessagePrioritizationSample { } private fun currentDatePlusDays(days: Int): Date { - val calender = Calendar.getInstance() - calender.add(Calendar.DATE, days) - return calender.time + return controllerDate().addDate(days) } fun sampleMessages(messageState: String, count: Int): List { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 18b3e4dd7..9100fb51c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation @@ -77,14 +76,11 @@ object MessageProcessorUtils { } } - /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ - fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier { + /** Get the Kafka Supplier for processor lookup [name] **/ + fun bluePrintProcessorSupplier(name: String): ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance + BluePrintDependencyService.instance>(name) } } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 190f4e891..7f150f5f3 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import io.mockk.coEvery import io.mockk.every +import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -27,13 +28,23 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.beans.factory.annotation.Autowired @@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull -import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DataJpaTest @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, + classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] ) @TestPropertySource( properties = [ - "spring.jpa.show-sql=true", - "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", @@ -70,7 +80,11 @@ import kotlin.test.assertTrue // To send initial test message "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" ] ) open class MessagePrioritizationConsumerTest { @@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Autowired - lateinit var messagePrioritizationService: MessagePrioritizationService - - @Autowired - lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService @Autowired - lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService @Before fun setup() { @@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest { @Test fun testMessagePrioritizationService() { runBlocking { - assertTrue( - ::messagePrioritizationService.isInitialized, - "failed to initialize messagePrioritizationService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) log.info("**************** without Correlation **************") /** Checking without correlation */ @@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest { val spyStreamingConsumerService = spyk(streamingConsumerService) coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = MessagePrioritizationConsumer( - bluePrintMessageLibPropertyService + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, mockk() ) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) @@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest { @Test fun testSchedulerService() { runBlocking { - val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration() - assertTrue( - ::messagePrioritizationSchedulerService.isInitialized, - "failed to initialize messagePrioritizationSchedulerService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationSchedulerService = + MessagePrioritizationSchedulerService(messagePrioritizationService) launch { - messagePrioritizationSchedulerService.startScheduling(configuration) + messagePrioritizationSchedulerService.startScheduling() } launch { /** To debug increase the delay time */ @@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest { /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test - fun testMessagePrioritizationConsumer() { + fun testKafkaMessagePrioritizationConsumer() { runBlocking { - messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val kafkaMessagePrioritizationService = + SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) + kafkaMessagePrioritizationService.setConfiguration(configuration) + + val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( + messagePrioritizationStateService, + kafkaMessagePrioritizationService + ) + + // Register the processor + BluePrintDependencyService.registerSingleton( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + defaultMessagePrioritizeProcessor + ) + + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, + kafkaMessagePrioritizationService + ) + messagePrioritizationConsumer.startConsuming(configuration) /** Send sample message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService @@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest { messagePrioritizationConsumer.shutDown() } } + + /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker + * Start : + * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + * */ + // @Test + fun testNatsMessagePrioritizationConsumer() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") + + val inputSubject = + NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) + + val natsMessagePrioritizationService = + SampleNatsMessagePrioritizationService(messagePrioritizationStateService) + natsMessagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationConsumer = + NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) + messagePrioritizationConsumer.startConsuming() + + /** Send sample message with every 1 sec */ + val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService + + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(200) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + } + delay(3000) + messagePrioritizationConsumer.shutDown() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 0285079ad..22c399608 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -17,10 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio @Service open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : SampleMessagePrioritizationService(messagePrioritizationStateService) - -/** For Kafka Consumer **/ -@Service -open class TestMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) - -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor( - messagePrioritizationStateService: MessagePrioritizationStateService, - messagePrioritizationService: MessagePrioritizationService -) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService) diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt index df3bde1b4..e8457283e 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.springframework.context.ApplicationContext +import org.springframework.context.ConfigurableApplicationContext import kotlin.reflect.KClass /** @@ -36,6 +37,14 @@ object BluePrintDependencyService { BluePrintDependencyService.applicationContext = applicationContext } + /** Used to inject [instance] into spring application context for the [key], + * Use this method only for testing + * */ + fun registerSingleton(key: String, instance: Any) { + val configurableApplicationContext = applicationContext as ConfigurableApplicationContext + configurableApplicationContext.beanFactory.registerSingleton(key, instance) + } + inline fun instance(name: String): T { return applicationContext.getBean(name) as? T ?: throw BluePrintProcessorException("failed to get instance($name)") diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt index b52cd711b..7fe955b03 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt @@ -27,6 +27,10 @@ object ClusterUtils { return ip.hostName } + fun applicationName(): String { + return BluePrintConstants.APP_NAME + } + fun clusterId(): String { return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster" } diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt index 147d360ba..8d5d846d7 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt @@ -41,6 +41,7 @@ class NatsLibConstants { const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service" const val DEFULT_NATS_SELECTOR = "cds-controller" const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats." + const val PROPERTY_NATS_CLUSTER_ID = "NATS_CLUSTER_ID" const val TYPE_TOKEN_AUTH = "token-auth" const val TYPE_TLS_AUTH = "tls-auth" } diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt index 9767ac29d..74897f322 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt @@ -16,11 +16,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.nats +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils open class NatsConnectionProperties { lateinit var type: String - var clusterId: String = ClusterUtils.clusterId() + var clusterId: String = NatsClusterUtils.clusterId() var clientId: String = ClusterUtils.clusterNodeId() lateinit var host: String /** Rest endpoint selector to access Monitoring API */ diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt index faf171528..18d0639f1 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt @@ -50,15 +50,13 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!! } else -> { - throw BluePrintProcessorException("Nats type($type) not supported") + throw BluePrintProcessorException("NATS type($type) not supported") } } } fun natsConnectionProperties(prefix: String): NatsConnectionProperties { - val type = bluePrintPropertiesService.propertyBeanType( - "$prefix.type", String::class.java - ) + val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java) return when (type) { NatsLibConstants.TYPE_TOKEN_AUTH -> { tokenAuthNatsConnectionProperties(prefix) @@ -67,7 +65,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic tlsAuthNatsConnectionProperties(prefix) } else -> { - throw BluePrintProcessorException("Grpc type($type) not supported") + throw BluePrintProcessorException("NATS type($type) not supported") } } } @@ -90,7 +88,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic TLSAuthNatsService(natsConnectionProperties) } else -> { - throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties") + throw BluePrintProcessorException("couldn't get NATS service for properties $natsConnectionProperties") } } } diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt index 60b7934ba..43a43bc03 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt @@ -21,15 +21,23 @@ import io.nats.client.Options import io.nats.streaming.NatsStreaming import io.nats.streaming.StreamingConnection import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) : BluePrintNatsService { + private val log = logger(TokenAuthNatsService::class) + lateinit var streamingConnection: StreamingConnection override suspend fun connection(): StreamingConnection { if (!::streamingConnection.isInitialized) { + log.info( + "NATS connection requesting for cluster(${natsConnectionProperties.clusterId}) with" + + "clientId($natsConnectionProperties.clientId)" + ) + val serverList = natsConnectionProperties.host.splitCommaAsList() val options = Options.Builder() diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt new file mode 100644 index 000000000..a7726a14b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt @@ -0,0 +1,45 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +object NatsClusterUtils { + + fun clusterId(): String { + return System.getenv(NatsLibConstants.PROPERTY_NATS_CLUSTER_ID) + ?: ClusterUtils.clusterId() + } + + fun applicationSubject(appName: String, subject: String): String { + return "$appName.$subject" + } + + fun currentApplicationSubject(subject: String): String { + return "${BluePrintConstants.APP_NAME}.$subject" + } + + fun currentNodeDurable(subject: String): String { + return "${ClusterUtils.clusterNodeId()}-$subject" + } + + fun applicationLoadBalanceGroup(): String { + return "${BluePrintConstants.APP_NAME}" + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt new file mode 100644 index 000000000..ba993d96a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt @@ -0,0 +1,53 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.nats.service + +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertTrue + +@RunWith(SpringRunner::class) +@ContextConfiguration( + classes = [BluePrintNatsLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + ["blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" + ] +) +class BluePrintNatsLibPropertyServiceTest { + + @Autowired + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService + + @Test + fun testNatsProperties() { + assertTrue(::bluePrintNatsLibPropertyService.isInitialized) + bluePrintNatsLibPropertyService.bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR) + } +} diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt index 549be6481..721828ac9 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -84,7 +84,7 @@ class BluePrintNatsServiceTest { * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V */ // @Test - fun localTntegrationTest() { + fun localIntegrationTest() { runBlocking { val connectionProperties = TokenAuthNatsConnectionProperties().apply { -- cgit 1.2.3-korg