From 8029f8e5332f107267ec11293c3099e54e87c67b Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 16 Jan 2020 11:21:50 -0500 Subject: Prioritization Optional NATS consumer support Add prioritization NATS consumer service and configuration data beans. Optimizing message prioritization service interface. Added Integration testing for NATS simulation. Updated sample docker compose for NATS support Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15 --- .../prioritization/MessagePrioritizationData.kt | 8 ++ .../prioritization/MessagePrioritizationService.kt | 13 +- .../AbstractKafkaMessagePrioritizationService.kt | 84 +++++++++++++ .../kafka/AbstractMessagePrioritizeProcessor.kt | 6 - .../kafka/DefaultMessagePrioritizeProcessor.kt | 13 +- .../kafka/KafkaMessagePrioritizationConsumer.kt | 108 +++++++++++++++++ .../kafka/MessagePrioritizationConsumer.kt | 107 ----------------- .../AbstractNatsMessagePrioritizationService.kt | 85 +++++++++++++ .../nats/NatsMessagePrioritizationConsumer.kt | 91 ++++++++++++++ .../AbstractMessagePrioritizationService.kt | 45 ++++--- .../MessagePrioritizationSchedulerService.kt | 8 +- .../service/SampleMessagePrioritizationService.kt | 82 +++++++++++-- .../utils/MessagePrioritizationSample.kt | 14 ++- .../prioritization/utils/MessageProcessorUtils.kt | 10 +- .../MessagePrioritizationConsumerTest.kt | 133 +++++++++++++++++---- .../message/prioritization/TestConfiguration.kt | 15 --- 16 files changed, 614 insertions(+), 208 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 8345df523..424929b82 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable { lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration } open class KafkaConfiguration : Serializable { @@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable { lateinit var outputTopic: String // Publish Configuration Selector } +open class NatsConfiguration : Serializable { + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + open class ExpiryConfiguration : Serializable { var frequencyMilli: Long = 30000L var maxPollRecord: Int = 1000 diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt index 464f97a88..dfe516953 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -16,21 +16,22 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorContext import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization interface MessagePrioritizationService { - fun setKafkaProcessorContext(processorContext: ProcessorContext?) + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration suspend fun prioritize(messagePrioritization: MessagePrioritization) /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ suspend fun output(messages: List) - /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */ - suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() - /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */ - suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 656646ff7..d4f8470c8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -17,17 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.logger /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun init(processorContext: ProcessorContext) { this.processorContext = processorContext } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index 624a69fd4..1b0612492 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils open class DefaultMessagePrioritizeProcessor( private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) : AbstractMessagePrioritizeProcessor() { private val log = logger(DefaultMessagePrioritizeProcessor::class) @@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor( val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { - messagePrioritizationService.prioritize(messagePrioritize) + kafkaMessagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor( override fun init(context: ProcessorContext) { super.init(context) /** Set Configuration and Processor Context to messagePrioritizationService */ - messagePrioritizationService.setKafkaProcessorContext(processorContext) + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } } override fun close() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..d5ec0233a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(KafkaMessagePrioritizationConsumer::class) + + private lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + kafkaConsumerConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (::streamingConsumerService.isInitialized) { + streamingConsumerService.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt deleted file mode 100644 index fb7cfd110..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList - -open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) { - - private val log = logger(MessagePrioritizationConsumer::class) - - lateinit var streamingConsumerService: BlueprintMessageConsumerService - - open fun consumerService(selector: String): BlueprintMessageConsumerService { - return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) - } - - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - kafkaConsumerConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - // Output will be sent to the group-output topic from Processor API - return topology - } - } - } - - suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) - - // Dynamic Consumer Function to create Topology - val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) - streamingConsumerService.consume(null, consumerFunction) - } - - suspend fun shutDown() { - if (streamingConsumerService != null) { - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..20da2c28c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,91 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 931403200..a6963d83f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -16,14 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils @@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService( private val log = logger(AbstractMessagePrioritizationService::class) - var processorContext: ProcessorContext? = null + lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { - this.processorContext = processorContext + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration } override suspend fun prioritize(messagePrioritize: MessagePrioritization) { try { log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + /** Get the cluster lock for message group */ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) // Save the Message @@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService( override suspend fun output(messages: List) { log.info("$$$$$ received in output processor id(${messages.ids()})") messages.forEach { message -> - val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - /** Check for Kafka Processing, If yes, then send to the output topic */ - if (this.processorContext != null) { - processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) } } - override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) { + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() try { val fetchMessages = messagePrioritizationStateService .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { + if (!expiredIds.isNullOrEmpty()) { messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - if (processorContext != null) { - fetchMessages.forEach { expired -> - expired.state = MessageState.EXPIRED.name - processorContext!!.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } } } catch (e: Exception) { log.error("failed in updating expired messages", e) @@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService( } } - override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) { + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration val clusterLock = MessageProcessorUtils.prioritizationCleanLock() try { messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt index b1c1fb15f..2f08c1c34 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService( } */ - open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + log.info("Starting Prioritization Scheduler Service...") GlobalScope.launch { expiryScheduler(prioritizationConfiguration) @@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.updateExpiredMessages(expiryConfiguration) + messagePrioritizationService.updateExpiredMessages() delay(expiryConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization expiry scheduler", e) @@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.cleanExpiredMessage(cleanConfiguration) + messagePrioritizationService.cleanExpiredMessage() delay(cleanConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization clean scheduler", e) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt index b7d878e4a..305e64ba4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -16,11 +16,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - private val log = logger(DefaultMessagePrioritizeProcessor::class) + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { /** Child overriding this implementation , if necessary */ override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List) { log.info("messages(${messages.ids()}) aggregated") /** Sequence based on Priority and Updated Date */ val sequencedMessage = messages.orderByHighestPriority() /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name) - output(sequencedMessage) + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) } - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { /** Dummy Implementation, This can also be read from file and stored as cached map **/ "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) else -> null } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index e497ef144..2c4ae30da 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Calendar +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate import java.util.Date import java.util.UUID @@ -35,6 +37,12 @@ object MessagePrioritizationSample { outputTopic = "prioritize-output-topic" expiredTopic = "prioritize-expired-topic" } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } expiryConfiguration = ExpiryConfiguration().apply { frequencyMilli = 10000L maxPollRecord = 2000 @@ -66,9 +74,7 @@ object MessagePrioritizationSample { } private fun currentDatePlusDays(days: Int): Date { - val calender = Calendar.getInstance() - calender.add(Calendar.DATE, days) - return calender.time + return controllerDate().addDate(days) } fun sampleMessages(messageState: String, count: Int): List { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 18b3e4dd7..9100fb51c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation @@ -77,14 +76,11 @@ object MessageProcessorUtils { } } - /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ - fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier { + /** Get the Kafka Supplier for processor lookup [name] **/ + fun bluePrintProcessorSupplier(name: String): ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance + BluePrintDependencyService.instance>(name) } } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 190f4e891..7f150f5f3 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import io.mockk.coEvery import io.mockk.every +import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -27,13 +28,23 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.beans.factory.annotation.Autowired @@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull -import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DataJpaTest @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, + classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] ) @TestPropertySource( properties = [ - "spring.jpa.show-sql=true", - "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", @@ -70,7 +80,11 @@ import kotlin.test.assertTrue // To send initial test message "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" ] ) open class MessagePrioritizationConsumerTest { @@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Autowired - lateinit var messagePrioritizationService: MessagePrioritizationService - - @Autowired - lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService @Autowired - lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService @Before fun setup() { @@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest { @Test fun testMessagePrioritizationService() { runBlocking { - assertTrue( - ::messagePrioritizationService.isInitialized, - "failed to initialize messagePrioritizationService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) log.info("**************** without Correlation **************") /** Checking without correlation */ @@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest { val spyStreamingConsumerService = spyk(streamingConsumerService) coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = MessagePrioritizationConsumer( - bluePrintMessageLibPropertyService + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, mockk() ) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) @@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest { @Test fun testSchedulerService() { runBlocking { - val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration() - assertTrue( - ::messagePrioritizationSchedulerService.isInitialized, - "failed to initialize messagePrioritizationSchedulerService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationSchedulerService = + MessagePrioritizationSchedulerService(messagePrioritizationService) launch { - messagePrioritizationSchedulerService.startScheduling(configuration) + messagePrioritizationSchedulerService.startScheduling() } launch { /** To debug increase the delay time */ @@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest { /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test - fun testMessagePrioritizationConsumer() { + fun testKafkaMessagePrioritizationConsumer() { runBlocking { - messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val kafkaMessagePrioritizationService = + SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) + kafkaMessagePrioritizationService.setConfiguration(configuration) + + val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( + messagePrioritizationStateService, + kafkaMessagePrioritizationService + ) + + // Register the processor + BluePrintDependencyService.registerSingleton( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + defaultMessagePrioritizeProcessor + ) + + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, + kafkaMessagePrioritizationService + ) + messagePrioritizationConsumer.startConsuming(configuration) /** Send sample message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService @@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest { messagePrioritizationConsumer.shutDown() } } + + /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker + * Start : + * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + * */ + // @Test + fun testNatsMessagePrioritizationConsumer() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") + + val inputSubject = + NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) + + val natsMessagePrioritizationService = + SampleNatsMessagePrioritizationService(messagePrioritizationStateService) + natsMessagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationConsumer = + NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) + messagePrioritizationConsumer.startConsuming() + + /** Send sample message with every 1 sec */ + val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService + + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(200) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + } + delay(3000) + messagePrioritizationConsumer.shutDown() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 0285079ad..22c399608 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -17,10 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio @Service open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : SampleMessagePrioritizationService(messagePrioritizationStateService) - -/** For Kafka Consumer **/ -@Service -open class TestMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) - -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor( - messagePrioritizationStateService: MessagePrioritizationStateService, - messagePrioritizationService: MessagePrioritizationService -) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService) -- cgit 1.2.3-korg