diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
8 files changed, 93 insertions, 11 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md index baf168767..482bbc2cc 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -17,6 +17,9 @@ To List topics ---------------- kafka-topics --list --bootstrap-server localhost:9092 +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic To Listen for Output ---------------------- diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml index ac46b3635..c33adcb70 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -33,6 +33,10 @@ <dependencies> <dependency> <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> + <artifactId>atomix-lib</artifactId> + </dependency> + <dependency> + <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>message-lib</artifactId> </dependency> <dependency> diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index c2965c4e8..35566abb4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -17,8 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa lateinit var prioritizationConfiguration: PrioritizationConfiguration lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + var clusterService: BluePrintClusterService? = null override fun init(context: ProcessorContext) { this.processorContext = context @@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa this.messagePrioritizationStateService = BluePrintDependencyService .messagePrioritizationStateService() } + + /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ + open fun initializeClusterService() { + /** Get the Cluster service to update in store */ + if (BluePrintConstants.CLUSTER_ENABLED) { + this.clusterService = BluePrintDependencyService.clusterService() + } + } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index ed124d1b2..b611060f7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLi import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList open class MessagePrioritizationConsumer( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -53,7 +54,7 @@ open class MessagePrioritizationConsumer( val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 431e02f30..4e4e2da7a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -25,6 +25,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -43,9 +44,13 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { + /** Get the cluster lock for message group */ + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize) // Save the Message messagePrioritizationStateService.saveMessage(messagePrioritize) handleCorrelationAndNextStep(messagePrioritize) + /** Cluster unLock for message group */ + MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -68,12 +73,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA initializeExpiryPunctuator() /** Set up cleaning records cron */ initializeCleanPunctuator() + /** Set up Cluster Service */ + initializeClusterService() } override fun close() { log.info( "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" + "taskId(${processorContext.taskId()})" ) expiryCancellable.cancel() cleanCancellable.cancel() @@ -102,7 +109,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA ) log.info( "Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days" + "hold(${cleanConfiguration.expiredRecordsHoldDays})days" ) } @@ -115,7 +122,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA val types = getGroupCorrelationTypes(messagePrioritization) log.info( "checking correlation for message($id), group($group), types($types), " + - "correlation id($correlationId)" + "correlation id($correlationId)" ) /** Get all previously received messages from database for group and optional types and correlation Id */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 7e5862cce..d1f38f4f2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -17,14 +17,40 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService object MessageProcessorUtils { + /** Utility to create the cluster lock for message [messagePrioritization] */ + suspend fun prioritizationGrouplock( + clusterService: BluePrintClusterService?, + messagePrioritization: MessagePrioritization + ): ClusterLock? { + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritization-${messagePrioritization.group}" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility used to cluster unlock for message [messagePrioritization] */ + suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) { + if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) { + clusterLock.unLock() + clusterLock.close() + } + } + fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): - ProcessorSupplier<K, V> { + ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 0ed9598f0..f9e23e826 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -81,6 +81,9 @@ open class MessagePrioritizationConsumerTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + @Autowired + lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer + @Before fun setup() { BluePrintDependencyService.inject(applicationContext) @@ -119,7 +122,8 @@ open class MessagePrioritizationConsumerTest { val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology - val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) @@ -135,7 +139,6 @@ open class MessagePrioritizationConsumerTest { // @Test fun testMessagePrioritizationConsumer() { runBlocking { - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) /** Send sample message with every 1 sec */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 37d853cfe..3d3d0c6f5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -21,6 +21,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan @@ -42,9 +43,34 @@ open class TestDatabaseConfiguration { } } -@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ +@Service +open class SampleMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,7 +80,7 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { } @Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() +open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor() +open class SampleMessageOutputProcessor : MessageOutputProcessor() |