From 4f4e2de08d3c6259da2497950a96d549d3e82f8a Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 2 Jan 2020 11:59:29 -0500 Subject: Message Prioritization message group lock. Implementation to avoid concurrent procession of message group while prioritization. Sample message prioritization Kafka listener properties. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb --- .../functions/message-prioritizaion/README.md | 31 ++++++++++++++++++++ .../functions/message-prioritizaion/README.txt | 28 ------------------ .../functions/message-prioritizaion/pom.xml | 4 +++ .../AbstractMessagePrioritizeProcessor.kt | 12 ++++++++ .../MessagePrioritizationConsumer.kt | 3 +- .../topology/MessagePrioritizeProcessor.kt | 13 +++++++-- .../prioritization/utils/MessageProcessorUtils.kt | 28 +++++++++++++++++- .../MessagePrioritizationConsumerTest.kt | 7 +++-- .../message/prioritization/TestConfiguration.kt | 34 +++++++++++++++++++--- 9 files changed, 121 insertions(+), 39 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.md delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.txt (limited to 'ms/blueprintsprocessor/functions') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md new file mode 100644 index 000000000..482bbc2cc --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -0,0 +1,31 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt deleted file mode 100644 index baf168767..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt +++ /dev/null @@ -1,28 +0,0 @@ - -To Delete Topics ------------------- -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic -kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog - -Create Topics --------------- - -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic - -To List topics ----------------- -kafka-topics --list --bootstrap-server localhost:9092 - - -To Listen for Output ----------------------- -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml index ac46b3635..c33adcb70 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -31,6 +31,10 @@ Blueprints Processor Function - Message Prioritization + + org.onap.ccsdk.cds.blueprintsprocessor + atomix-lib + org.onap.ccsdk.cds.blueprintsprocessor message-lib diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index c2965c4e8..35566abb4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -17,8 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa lateinit var prioritizationConfiguration: PrioritizationConfiguration lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + var clusterService: BluePrintClusterService? = null override fun init(context: ProcessorContext) { this.processorContext = context @@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa this.messagePrioritizationStateService = BluePrintDependencyService .messagePrioritizationStateService() } + + /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ + open fun initializeClusterService() { + /** Get the Cluster service to update in store */ + if (BluePrintConstants.CLUSTER_ENABLED) { + this.clusterService = BluePrintDependencyService.clusterService() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ed124d1b2..b611060f7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class MessagePrioritizationConsumer( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer( val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 431e02f30..4e4e2da7a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier { + ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance>(name) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 0ed9598f0..f9e23e826 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + @Autowired + lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + @Before fun setup() { BluePrintDependencyService.inject(applicationContext) @@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest { val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology - val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) @@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest { // @Test fun testMessagePrioritizationConsumer() { runBlocking { - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) /** Send sample message with every 1 sec */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 37d853cfe..3d3d0c6f5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -42,9 +43,34 @@ open class TestDatabaseConfiguration { } } -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ +@Service +open class SampleMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { } @Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() +open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor() +open class SampleMessageOutputProcessor : MessageOutputProcessor() -- cgit 1.2.3-korg