From 4f4e2de08d3c6259da2497950a96d549d3e82f8a Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 2 Jan 2020 11:59:29 -0500 Subject: Message Prioritization message group lock. Implementation to avoid concurrent procession of message group while prioritization. Sample message prioritization Kafka listener properties. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: Ifbf39985b03c662b6ccf7740be711cfeb7bfbebb --- .../src/main/resources/application-dev.properties | 7 +++++ .../functions/message-prioritizaion/README.md | 31 ++++++++++++++++++++ .../functions/message-prioritizaion/README.txt | 28 ------------------ .../functions/message-prioritizaion/pom.xml | 4 +++ .../AbstractMessagePrioritizeProcessor.kt | 12 ++++++++ .../MessagePrioritizationConsumer.kt | 3 +- .../topology/MessagePrioritizeProcessor.kt | 13 +++++++-- .../prioritization/utils/MessageProcessorUtils.kt | 28 +++++++++++++++++- .../MessagePrioritizationConsumerTest.kt | 7 +++-- .../message/prioritization/TestConfiguration.kt | 34 +++++++++++++++++++--- .../service/AtomixBluePrintClusterService.kt | 7 +++++ .../core/service/BluePrintClusterService.kt | 1 + 12 files changed, 136 insertions(+), 39 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.md delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.txt (limited to 'ms') diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties index 89b4f65b4..26c7204d9 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties @@ -129,3 +129,10 @@ blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id blueprintsprocessor.messageproducer.self-service-api.topic=producer.t + +# Message prioritization kakfa properties, Enable if Prioritization service is needed +# Deploy message-prioritization function along with blueprintsprocessor application. +#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth +#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092 +#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller +#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md new file mode 100644 index 000000000..482bbc2cc --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -0,0 +1,31 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt deleted file mode 100644 index baf168767..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt +++ /dev/null @@ -1,28 +0,0 @@ - -To Delete Topics ------------------- -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic -kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog - -Create Topics --------------- - -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic - -To List topics ----------------- -kafka-topics --list --bootstrap-server localhost:9092 - - -To Listen for Output ----------------------- -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml index ac46b3635..c33adcb70 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -31,6 +31,10 @@ Blueprints Processor Function - Message Prioritization + + org.onap.ccsdk.cds.blueprintsprocessor + atomix-lib + org.onap.ccsdk.cds.blueprintsprocessor message-lib diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index c2965c4e8..35566abb4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -17,8 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa lateinit var prioritizationConfiguration: PrioritizationConfiguration lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + var clusterService: BluePrintClusterService? = null override fun init(context: ProcessorContext) { this.processorContext = context @@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessa this.messagePrioritizationStateService = BluePrintDependencyService .messagePrioritizationStateService() } + + /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ + open fun initializeClusterService() { + /** Get the Cluster service to update in store */ + if (BluePrintConstants.CLUSTER_ENABLED) { + this.clusterService = BluePrintDependencyService.clusterService() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ed124d1b2..b611060f7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class MessagePrioritizationConsumer( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer( val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 431e02f30..4e4e2da7a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier { + ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance>(name) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 0ed9598f0..f9e23e826 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + @Autowired + lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + @Before fun setup() { BluePrintDependencyService.inject(applicationContext) @@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest { val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology - val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) @@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest { // @Test fun testMessagePrioritizationConsumer() { runBlocking { - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) /** Send sample message with every 1 sec */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 37d853cfe..3d3d0c6f5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -42,9 +43,34 @@ open class TestDatabaseConfiguration { } } -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ +@Service +open class SampleMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { } @Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() +open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor() +open class SampleMessageOutputProcessor : MessageOutputProcessor() diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 0690eb89d..214a14310 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -148,12 +148,18 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + val log = logger(ClusterLockImpl::class) lateinit var distributedLock: DistributedLock + override fun name(): String { + return distributedLock.name() + } + override suspend fun lock() { distributedLock = AtomixLibUtils.distributedLock(atomix, name) distributedLock.lock() + log.debug("Cluster lock($name) created..") } override suspend fun tryLock(timeout: Long): Boolean { @@ -163,6 +169,7 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String) override suspend fun unLock() { distributedLock.unlock() + log.debug("Cluster unlock(${name()}) successfully..") } override fun isLocked(): Boolean { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 21fcfc509..f994628a2 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -57,6 +57,7 @@ data class ClusterInfo( data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) interface ClusterLock { + fun name(): String suspend fun lock() suspend fun tryLock(timeout: Long): Boolean suspend fun unLock() -- cgit 1.2.3-korg