diff options
author | Brinda Santh <bs2796@att.com> | 2020-01-16 11:21:50 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2020-01-16 13:58:05 -0500 |
commit | 8029f8e5332f107267ec11293c3099e54e87c67b (patch) | |
tree | 5341184c90a8a0013ea207dae2ed59d1ba3fa80b /ms/blueprintsprocessor/functions/message-prioritizaion/src/main | |
parent | f52cf7ce4451fd5fa1bbc6cf30e5b2a0acab7276 (diff) |
Prioritization Optional NATS consumer support
Add prioritization NATS consumer service and configuration data beans.
Optimizing message prioritization service interface.
Added Integration testing for NATS simulation.
Updated sample docker compose for NATS support
Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src/main')
13 files changed, 406 insertions, 68 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 8345df523..424929b82 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable { lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration } open class KafkaConfiguration : Serializable { @@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable { lateinit var outputTopic: String // Publish Configuration Selector } +open class NatsConfiguration : Serializable { + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + open class ExpiryConfiguration : Serializable { var frequencyMilli: Long = 30000L var maxPollRecord: Int = 1000 diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt index 464f97a88..dfe516953 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -16,21 +16,22 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization -import org.apache.kafka.streams.processor.ProcessorContext import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization interface MessagePrioritizationService { - fun setKafkaProcessorContext(processorContext: ProcessorContext?) + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration suspend fun prioritize(messagePrioritization: MessagePrioritization) /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ suspend fun output(messages: List<MessagePrioritization>) - /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */ - suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() - /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */ - suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List<MessagePrioritization>) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 656646ff7..d4f8470c8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -17,17 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.logger /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun init(processorContext: ProcessorContext) { this.processorContext = processorContext } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index 624a69fd4..1b0612492 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils open class DefaultMessagePrioritizeProcessor( private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { private val log = logger(DefaultMessagePrioritizeProcessor::class) @@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor( val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { - messagePrioritizationService.prioritize(messagePrioritize) + kafkaMessagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor( override fun init(context: ProcessorContext) { super.init(context) /** Set Configuration and Processor Context to messagePrioritizationService */ - messagePrioritizationService.setKafkaProcessorContext(processorContext) + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } } override fun close() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt index fb7cfd110..d5ec0233a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties @@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList -open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService ) { - private val log = logger(MessagePrioritizationConsumer::class) + private val log = logger(KafkaMessagePrioritizationConsumer::class) - lateinit var streamingConsumerService: BlueprintMessageConsumerService + private lateinit var streamingConsumerService: BlueprintMessageConsumerService open fun consumerService(selector: String): BlueprintMessageConsumerService { return bluePrintMessageLibPropertyService @@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer( topology.addProcessor( MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, bluePrintProcessorSupplier<ByteArray, ByteArray>( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE ), MessagePrioritizationConstants.SOURCE_INPUT ) @@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer( } suspend fun shutDown() { - if (streamingConsumerService != null) { + if (::streamingConsumerService.isInitialized) { streamingConsumerService.shutDown() } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List<MessagePrioritization>) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..20da2c28c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,91 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 931403200..a6963d83f 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -16,14 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils @@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService( private val log = logger(AbstractMessagePrioritizationService::class) - var processorContext: ProcessorContext? = null + lateinit var prioritizationConfiguration: PrioritizationConfiguration - override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { - this.processorContext = processorContext + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration } override suspend fun prioritize(messagePrioritize: MessagePrioritization) { try { log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + /** Get the cluster lock for message group */ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) // Save the Message @@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService( override suspend fun output(messages: List<MessagePrioritization>) { log.info("$$$$$ received in output processor id(${messages.ids()})") messages.forEach { message -> - val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - /** Check for Kafka Processing, If yes, then send to the output topic */ - if (this.processorContext != null) { - processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) } } - override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) { + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() try { val fetchMessages = messagePrioritizationStateService .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { + if (!expiredIds.isNullOrEmpty()) { messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - if (processorContext != null) { - fetchMessages.forEach { expired -> - expired.state = MessageState.EXPIRED.name - processorContext!!.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } } } catch (e: Exception) { log.error("failed in updating expired messages", e) @@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService( } } - override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) { + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration val clusterLock = MessageProcessorUtils.prioritizationCleanLock() try { messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt index b1c1fb15f..2f08c1c34 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService( } */ - open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + log.info("Starting Prioritization Scheduler Service...") GlobalScope.launch { expiryScheduler(prioritizationConfiguration) @@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.updateExpiredMessages(expiryConfiguration) + messagePrioritizationService.updateExpiredMessages() delay(expiryConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization expiry scheduler", e) @@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService( withContext(Dispatchers.Default) { while (keepGoing) { try { - messagePrioritizationService.cleanExpiredMessage(cleanConfiguration) + messagePrioritizationService.cleanExpiredMessage() delay(cleanConfiguration.frequencyMilli) } catch (e: Exception) { log.error("failed in prioritization clean scheduler", e) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt index b7d878e4a..305e64ba4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -16,11 +16,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - private val log = logger(DefaultMessagePrioritizeProcessor::class) + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { /** Child overriding this implementation , if necessary */ override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List<MessagePrioritization>) { log.info("messages(${messages.ids()}) aggregated") /** Sequence based on Priority and Updated Date */ val sequencedMessage = messages.orderByHighestPriority() /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name) - output(sequencedMessage) + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) } - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { return when (messagePrioritization.group) { /** Dummy Implementation, This can also be read from file and stored as cached map **/ "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) else -> null } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index e497ef144..2c4ae30da 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Calendar +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate import java.util.Date import java.util.UUID @@ -35,6 +37,12 @@ object MessagePrioritizationSample { outputTopic = "prioritize-output-topic" expiredTopic = "prioritize-expired-topic" } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } expiryConfiguration = ExpiryConfiguration().apply { frequencyMilli = 10000L maxPollRecord = 2000 @@ -66,9 +74,7 @@ object MessagePrioritizationSample { } private fun currentDatePlusDays(days: Int): Date { - val calender = Calendar.getInstance() - calender.add(Calendar.DATE, days) - return calender.time + return controllerDate().addDate(days) } fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 18b3e4dd7..9100fb51c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.streams.processor.ProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation @@ -77,14 +76,11 @@ object MessageProcessorUtils { } } - /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ - fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier<K, V> { + /** Get the Kafka Supplier for processor lookup [name] **/ + fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor - val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) - processorInstance.prioritizationConfiguration = prioritizationConfiguration - processorInstance + BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) } } } |