aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-02 11:59:29 -0500
committerBrinda Santh <bs2796@att.com>2020-01-02 11:59:29 -0500
commit4f4e2de08d3c6259da2497950a96d549d3e82f8a (patch)
tree4c5a8e7a97d96e9de73ecb7ed1b53597ee0ebdd5 /ms/blueprintsprocessor/functions
parentba75d2fad2b0111a510f4ee4cc87e658fb32ac4b (diff)
Message Prioritization message group lock.
Implementation to avoid concurrent procession of message group while prioritization. Sample message prioritization Kafka listener properties. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb
Diffstat (limited to 'ms/blueprintsprocessor/functions')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/README.md (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/README.txt)3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml4
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt12
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt28
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt7
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt34
8 files changed, 93 insertions, 11 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
index baf168767..482bbc2cc 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
@@ -17,6 +17,9 @@ To List topics
----------------
kafka-topics --list --bootstrap-server localhost:9092
+To publish message
+--------------------
+kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic
To Listen for Output
----------------------
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
index ac46b3635..c33adcb70 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
@@ -33,6 +33,10 @@
<dependencies>
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>atomix-lib</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>message-lib</artifactId>
</dependency>
<dependency>
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
index c2965c4e8..35566abb4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
@@ -17,8 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
lateinit var prioritizationConfiguration: PrioritizationConfiguration
lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+ var clusterService: BluePrintClusterService? = null
override fun init(context: ProcessorContext) {
this.processorContext = context
@@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
this.messagePrioritizationStateService = BluePrintDependencyService
.messagePrioritizationStateService()
}
+
+ /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
+ open fun initializeClusterService() {
+ /** Get the Cluster service to update in store */
+ if (BluePrintConstants.CLUSTER_ENABLED) {
+ this.clusterService = BluePrintDependencyService.clusterService()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
index ed124d1b2..b611060f7 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
open class MessagePrioritizationConsumer(
private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer(
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
as KafkaStreamsBasicAuthConsumerProperties
- val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
index 431e02f30..4e4e2da7a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
@@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
+ /** Get the cluster lock for message group */
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
+ /** Cluster unLock for message group */
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -68,12 +73,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
initializeExpiryPunctuator()
/** Set up cleaning records cron */
initializeCleanPunctuator()
+ /** Set up Cluster Service */
+ initializeClusterService()
}
override fun close() {
log.info(
"closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
+ "taskId(${processorContext.taskId()})"
)
expiryCancellable.cancel()
cleanCancellable.cancel()
@@ -102,7 +109,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
)
log.info(
"Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
)
}
@@ -115,7 +122,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
val types = getGroupCorrelationTypes(messagePrioritization)
log.info(
"checking correlation for message($id), group($group), types($types), " +
- "correlation id($correlationId)"
+ "correlation id($correlationId)"
)
/** Get all previously received messages from database for group and optional types and correlation Id */
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 7e5862cce..d1f38f4f2 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -17,14 +17,40 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
+ /** Utility to create the cluster lock for message [messagePrioritization] */
+ suspend fun prioritizationGrouplock(
+ clusterService: BluePrintClusterService?,
+ messagePrioritization: MessagePrioritization
+ ): ClusterLock? {
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritization-${messagePrioritization.group}"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility used to cluster unlock for message [messagePrioritization] */
+ suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
+ if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ clusterLock.unLock()
+ clusterLock.close()
+ }
+ }
+
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 0ed9598f0..f9e23e826 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+ @Autowired
+ lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+
@Before
fun setup() {
BluePrintDependencyService.inject(applicationContext)
@@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest {
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology
- val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val kafkaStreamConsumerFunction =
+ spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
val messageConsumerProperties = bluePrintMessageLibPropertyService
.messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
@@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest {
// @Test
fun testMessagePrioritizationConsumer() {
runBlocking {
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
/** Send sample message with every 1 sec */
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 37d853cfe..3d3d0c6f5 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
@@ -42,9 +43,34 @@ open class TestDatabaseConfiguration {
}
}
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+/* Sample Prioritization Listener, used during Application startup
+@Component
+open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
+
+ private val log = logger(SamplePrioritizationListeners::class)
+
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer
+ .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+
+ @PreDestroy
+ open fun destroy() = runBlocking {
+ log.info("Shutting down PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer.shutDown()
+ }
+}
+ */
+@Service
+open class SampleMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
@@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
}
@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class DefaultMessageOutputProcessor : MessageOutputProcessor()
+open class SampleMessageOutputProcessor : MessageOutputProcessor()