diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
15 files changed, 514 insertions, 108 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 8345df523..424929b82 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable { lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration } open class KafkaConfiguration : Serializable { @@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable { lateinit var outputTopic: String // Publish Configuration Selector } +open class NatsConfiguration : Serializable { + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + open class ExpiryConfiguration : Serializable { var frequencyMilli: Long = 30000L var maxPollRecord: Int = 1000 diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt index 464f97a88..dfe516953 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -16,21 +16,22 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorContext import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization interface MessagePrioritizationService { - fun setKafkaProcessorContext(processorContext: ProcessorContext?) + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration suspend fun prioritize(messagePrioritization: MessagePrioritization) /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ suspend fun output(messages: List<MessagePrioritization>) - /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */ - suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() - /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */ - suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List<MessagePrioritization>) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 656646ff7..d4f8470c8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -17,17 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.logger /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun init(processorContext: ProcessorContext) { this.processorContext = processorContext } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index 624a69fd4..1b0612492 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils open class DefaultMessagePrioritizeProcessor( private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { private val log = logger(DefaultMessagePrioritizeProcessor::class) @@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor( val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { - messagePrioritizationService.prioritize(messagePrioritize) + kafkaMessagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor( override fun init(context: ProcessorContext) { super.init(context) /** Set Configuration and Processor Context to messagePrioritizationService */ - messagePrioritizationService.setKafkaProcessorContext(processorContext) + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } } override fun close() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt index fb7cfd110..d5ec0233a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties @@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList -open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) { - private val log = logger(MessagePrioritizationConsumer::class) + private val log = logger(KafkaMessagePrioritizationConsumer::class) - lateinit var streamingConsumerService: BlueprintMessageConsumerService + private lateinit var streamingConsumerService: BlueprintMessageConsumerService open fun consumerService(selector: String): BlueprintMessageConsumerService { return bluePrintMessageLibPropertyService @@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer( topology.addProcessor( MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, bluePrintProcessorSupplier<ByteArray, ByteArray>( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE ), MessagePrioritizationConstants.SOURCE_INPUT ) @@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer( } suspend fun shutDown() { - if (streamingConsumerService != null) { + if (::streamingConsumerService.isInitialized) { streamingConsumerService.shutDown() } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List<MessagePrioritization>) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..20da2c28c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,91 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 931403200..a6963d83f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -16,14 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils @@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService( private val log = logger(AbstractMessagePrioritizationService::class) - var processorContext: ProcessorContext? = null + lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { - this.processorContext = processorContext + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration } override suspend fun prioritize(messagePrioritize: MessagePrioritization) { try { log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + /** Get the cluster lock for message group */ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) // Save the Message @@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService( override suspend fun output(messages: List<MessagePrioritization>) { log.info("$$$$$ received in output processor id(${messages.ids()})") messages.forEach { message -> - val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - /** Check for Kafka Processing, If yes, then send to the output topic */ - if (this.processorContext != null) { - processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) } } - override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) { + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() try { val fetchMessages = messagePrioritizationStateService .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { + if (!expiredIds.isNullOrEmpty()) { messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - if (processorContext != null) { - fetchMessages.forEach { expired -> - expired.state = MessageState.EXPIRED.name - processorContext!!.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } } } catch (e: Exception) { log.error("failed in updating expired messages", e) @@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService( } } - override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) { + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration val clusterLock = MessageProcessorUtils.prioritizationCleanLock() try { messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt index b1c1fb15f..2f08c1c34 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService( } */ - open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + log.info("Starting Prioritization Scheduler Service...") GlobalScope.launch { expiryScheduler(prioritizationConfiguration) @@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.updateExpiredMessages(expiryConfiguration) + messagePrioritizationService.updateExpiredMessages() delay(expiryConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization expiry scheduler", e) @@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.cleanExpiredMessage(cleanConfiguration) + messagePrioritizationService.cleanExpiredMessage() delay(cleanConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization clean scheduler", e) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt index b7d878e4a..305e64ba4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -16,11 +16,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - private val log = logger(DefaultMessagePrioritizeProcessor::class) + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { /** Child overriding this implementation , if necessary */ override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List<MessagePrioritization>) { log.info("messages(${messages.ids()}) aggregated") /** Sequence based on Priority and Updated Date */ val sequencedMessage = messages.orderByHighestPriority() /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name) - output(sequencedMessage) + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) } - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { return when (messagePrioritization.group) { /** Dummy Implementation, This can also be read from file and stored as cached map **/ "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) else -> null } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index e497ef144..2c4ae30da 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Calendar +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate import java.util.Date import java.util.UUID @@ -35,6 +37,12 @@ object MessagePrioritizationSample { outputTopic = "prioritize-output-topic" expiredTopic = "prioritize-expired-topic" } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } expiryConfiguration = ExpiryConfiguration().apply { frequencyMilli = 10000L maxPollRecord = 2000 @@ -66,9 +74,7 @@ object MessagePrioritizationSample { } private fun currentDatePlusDays(days: Int): Date { - val calender = Calendar.getInstance() - calender.add(Calendar.DATE, days) - return calender.time + return controllerDate().addDate(days) } fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 18b3e4dd7..9100fb51c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation @@ -77,14 +76,11 @@ object MessageProcessorUtils { } } - /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ - fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier<K, V> { + /** Get the Kafka Supplier for processor lookup [name] **/ + fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance + BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) } } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 190f4e891..7f150f5f3 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import io.mockk.coEvery import io.mockk.every +import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -27,13 +28,23 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.beans.factory.annotation.Autowired @@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull -import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DataJpaTest @DirtiesContext @ContextConfiguration( - classes = [BluePrintMessageLibConfiguration::class, + classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] ) @TestPropertySource( properties = [ - "spring.jpa.show-sql=true", - "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", @@ -70,7 +80,11 @@ import kotlin.test.assertTrue // To send initial test message "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" ] ) open class MessagePrioritizationConsumerTest { @@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Autowired - lateinit var messagePrioritizationService: MessagePrioritizationService - - @Autowired - lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService @Autowired - lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService @Before fun setup() { @@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest { @Test fun testMessagePrioritizationService() { runBlocking { - assertTrue( - ::messagePrioritizationService.isInitialized, - "failed to initialize messagePrioritizationService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) log.info("**************** without Correlation **************") /** Checking without correlation */ @@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest { val spyStreamingConsumerService = spyk(streamingConsumerService) coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = MessagePrioritizationConsumer( - bluePrintMessageLibPropertyService + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, mockk() ) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) @@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest { @Test fun testSchedulerService() { runBlocking { - val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration() - assertTrue( - ::messagePrioritizationSchedulerService.isInitialized, - "failed to initialize messagePrioritizationSchedulerService" - ) + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationSchedulerService = + MessagePrioritizationSchedulerService(messagePrioritizationService) launch { - messagePrioritizationSchedulerService.startScheduling(configuration) + messagePrioritizationSchedulerService.startScheduling() } launch { /** To debug increase the delay time */ @@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest { /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test - fun testMessagePrioritizationConsumer() { + fun testKafkaMessagePrioritizationConsumer() { runBlocking { - messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val kafkaMessagePrioritizationService = + SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) + kafkaMessagePrioritizationService.setConfiguration(configuration) + + val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( + messagePrioritizationStateService, + kafkaMessagePrioritizationService + ) + + // Register the processor + BluePrintDependencyService.registerSingleton( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + defaultMessagePrioritizeProcessor + ) + + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, + kafkaMessagePrioritizationService + ) + messagePrioritizationConsumer.startConsuming(configuration) /** Send sample message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService @@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest { messagePrioritizationConsumer.shutDown() } } + + /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker + * Start : + * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + * */ + // @Test + fun testNatsMessagePrioritizationConsumer() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") + + val inputSubject = + NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) + + val natsMessagePrioritizationService = + SampleNatsMessagePrioritizationService(messagePrioritizationStateService) + natsMessagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationConsumer = + NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) + messagePrioritizationConsumer.startConsuming() + + /** Send sample message with every 1 sec */ + val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService + + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(200) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + } + delay(3000) + messagePrioritizationConsumer.shutDown() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 0285079ad..22c399608 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -17,10 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio @Service open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : SampleMessagePrioritizationService(messagePrioritizationStateService) - -/** For Kafka Consumer **/ -@Service -open class TestMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) - -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor( - messagePrioritizationStateService: MessagePrioritizationStateService, - messagePrioritizationService: MessagePrioritizationService -) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService) |