aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src/main')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt12
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt28
4 files changed, 51 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
index c2965c4e8..35566abb4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
@@ -17,8 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
lateinit var prioritizationConfiguration: PrioritizationConfiguration
lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+ var clusterService: BluePrintClusterService? = null
override fun init(context: ProcessorContext) {
this.processorContext = context
@@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
this.messagePrioritizationStateService = BluePrintDependencyService
.messagePrioritizationStateService()
}
+
+ /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
+ open fun initializeClusterService() {
+ /** Get the Cluster service to update in store */
+ if (BluePrintConstants.CLUSTER_ENABLED) {
+ this.clusterService = BluePrintDependencyService.clusterService()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
index ed124d1b2..b611060f7 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
open class MessagePrioritizationConsumer(
private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer(
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
as KafkaStreamsBasicAuthConsumerProperties
- val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
index 431e02f30..4e4e2da7a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
@@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
+ /** Get the cluster lock for message group */
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
+ /** Cluster unLock for message group */
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -68,12 +73,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
initializeExpiryPunctuator()
/** Set up cleaning records cron */
initializeCleanPunctuator()
+ /** Set up Cluster Service */
+ initializeClusterService()
}
override fun close() {
log.info(
"closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
+ "taskId(${processorContext.taskId()})"
)
expiryCancellable.cancel()
cleanCancellable.cancel()
@@ -102,7 +109,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
)
log.info(
"Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
)
}
@@ -115,7 +122,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
val types = getGroupCorrelationTypes(messagePrioritization)
log.info(
"checking correlation for message($id), group($group), types($types), " +
- "correlation id($correlationId)"
+ "correlation id($correlationId)"
)
/** Get all previously received messages from database for group and optional types and correlation Id */
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 7e5862cce..d1f38f4f2 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -17,14 +17,40 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
+ /** Utility to create the cluster lock for message [messagePrioritization] */
+ suspend fun prioritizationGrouplock(
+ clusterService: BluePrintClusterService?,
+ messagePrioritization: MessagePrioritization
+ ): ClusterLock? {
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritization-${messagePrioritization.group}"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility used to cluster unlock for message [messagePrioritization] */
+ suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
+ if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ clusterLock.unLock()
+ clusterLock.close()
+ }
+ }
+
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)