diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src/main')
4 files changed, 51 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index c2965c4e8..35566abb4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -17,8 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa lateinit var prioritizationConfiguration: PrioritizationConfiguration lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + var clusterService: BluePrintClusterService? = null override fun init(context: ProcessorContext) { this.processorContext = context @@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa this.messagePrioritizationStateService = BluePrintDependencyService .messagePrioritizationStateService() } + + /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ + open fun initializeClusterService() { + /** Get the Cluster service to update in store */ + if (BluePrintConstants.CLUSTER_ENABLED) { + this.clusterService = BluePrintDependencyService.clusterService() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ed124d1b2..b611060f7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class MessagePrioritizationConsumer( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer( val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 431e02f30..4e4e2da7a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { + /** Get the cluster lock for message group */ + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize) // Save the Message messagePrioritizationStateService.saveMessage(messagePrioritize) handleCorrelationAndNextStep(messagePrioritize) + /** Cluster unLock for message group */ + MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -68,12 +73,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA initializeExpiryPunctuator() /** Set up cleaning records cron */ initializeCleanPunctuator() + /** Set up Cluster Service */ + initializeClusterService() } override fun close() { log.info( "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" + "taskId(${processorContext.taskId()})" ) expiryCancellable.cancel() cleanCancellable.cancel() @@ -102,7 +109,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA ) log.info( "Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days" + "hold(${cleanConfiguration.expiredRecordsHoldDays})days" ) } @@ -115,7 +122,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA val types = getGroupCorrelationTypes(messagePrioritization) log.info( "checking correlation for message($id), group($group), types($types), " + - "correlation id($correlationId)" + "correlation id($correlationId)" ) /** Get all previously received messages from database for group and optional types and correlation Id */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 7e5862cce..d1f38f4f2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -17,14 +17,40 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService object MessageProcessorUtils { + /** Utility to create the cluster lock for message [messagePrioritization] */ + suspend fun prioritizationGrouplock( + clusterService: BluePrintClusterService?, + messagePrioritization: MessagePrioritization + ): ClusterLock? { + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritization-${messagePrioritization.group}" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility used to cluster unlock for message [messagePrioritization] */ + suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) { + if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) { + clusterLock.unLock() + clusterLock.close() + } + } + fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier<K, V> { + ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) |