diff options
author | Brinda Santh <bs2796@att.com> | 2020-01-13 11:37:17 -0500 |
---|---|---|
committer | KAPIL SINGAL <ks220y@att.com> | 2020-01-13 18:22:25 +0000 |
commit | a31c304eaceaa579b4be976d987f1cdeddba05c2 (patch) | |
tree | c9f94e2b066d955e66c184ed8bb327ae8a150ad8 /ms/blueprintsprocessor/functions/message-prioritizaion/src | |
parent | e9b1dfd73a2298cc9679e527ae90a651f5025dd2 (diff) |
Prioritization expiry and clean scheduler service
Add prioritization expiry and clean scheduler service implementation.
Optimizing message passing between processors.
Added message sorting utils.
Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I049ea3bae2e2c546244136f15c3d89deda1e7053
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
16 files changed, 377 insertions, 205 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 3ecfa27e0..8345df523 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -36,6 +36,10 @@ open class PrioritizationConfiguration : Serializable { lateinit var expiryConfiguration: ExpiryConfiguration lateinit var shutDownConfiguration: ShutDownConfiguration lateinit var cleanConfiguration: CleanConfiguration + var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration +} + +open class KafkaConfiguration : Serializable { lateinit var inputTopicSelector: String // Consumer Configuration Selector lateinit var expiredTopic: String // Publish Configuration Selector lateinit var outputTopic: String // Publish Configuration Selector diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt index 584fd00d3..464f97a88 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -25,5 +25,12 @@ interface MessagePrioritizationService { suspend fun prioritize(messagePrioritization: MessagePrioritization) - suspend fun output(id: String) + /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ + suspend fun output(messages: List<MessagePrioritization>) + + /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */ + suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) + + /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */ + suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt index 5dd41d7f3..2e5e6c617 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt @@ -35,6 +35,8 @@ interface MessagePrioritizationStateService { suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>? + suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? suspend fun getCorrelatedMessages( @@ -60,9 +62,11 @@ interface MessagePrioritizationStateService { suspend fun deleteMessage(id: String) + suspend fun deleteMessages(id: List<String>) + + suspend fun deleteExpiredMessage(retentionDays: Int) + suspend fun deleteMessageByGroup(group: String) suspend fun deleteMessageStates(group: String, states: List<String>) - - suspend fun deleteExpiredMessage(group: String, retentionDays: Int) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 05b820adb..d8e71d413 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -50,3 +50,18 @@ fun MessagePrioritization.toFormatedCorrelation(): String { fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) } + +/** get list of message ids **/ +fun List<MessagePrioritization>.ids(): List<String> { + return this.map { it.id } +} + +/** Ordered by highest priority and updated date **/ +fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> { + return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate)) +} + +/** Ordered by Updated date **/ +fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> { + return this.sortedWith(compareBy(MessagePrioritization::updatedDate)) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt index b0514838a..0b35e3856 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt @@ -33,20 +33,20 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.createdDate asc" + "ORDER BY pm.createdDate asc" ) fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>? @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.updatedDate asc" + "ORDER BY pm.updatedDate asc" ) fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable): - List<MessagePrioritization>? + List<MessagePrioritization>? @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" ) fun findByGroupAndStateInAndNotExpiredDate( group: String, @@ -57,7 +57,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Query( "FROM MessagePrioritization pm WHERE pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" ) fun findByStateInAndExpiredDate( states: List<String>, @@ -67,7 +67,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" ) fun findByGroupAndStateInAndExpiredDate( group: String, @@ -78,20 +78,25 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Query( "FROM MessagePrioritization pm WHERE pm.group = :group " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" ) - fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? + fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" ) fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String): - List<MessagePrioritization>? + List<MessagePrioritization>? @Query( "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" ) fun findByGroupAndTypesAndCorrelationId( group: String, @@ -104,7 +109,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id = :id" + "WHERE id = :id" ) fun setStateForMessageId(id: String, state: String, currentDate: Date): Int @@ -112,7 +117,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id = :id" + "WHERE id = :id" ) fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int @@ -120,7 +125,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id IN :ids" + "WHERE id IN :ids" ) fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int @@ -128,7 +133,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id IN :ids" + "WHERE id IN :ids" ) fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int @@ -136,7 +141,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + - "WHERE id = :id" + "WHERE id = :id" ) fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int @@ -144,17 +149,27 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, @Transactional @Query( "UPDATE MessagePrioritization SET state = :state, " + - "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" ) fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int @Modifying @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") + @Query("DELETE FROM MessagePrioritization WHERE id IN :ids") + fun deleteByIds(ids: List<String>) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ") + fun deleteByExpiryDate(expiryCheckDate: Date) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE group = :group") fun deleteGroup(group: String) @Modifying @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") + @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states") fun deleteGroupAndStateIn(group: String, states: List<String>) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt index c14a404ad..624a69fd4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -16,9 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka -import org.apache.kafka.streams.processor.Cancellable import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.PunctuationType import org.apache.kafka.streams.processor.To import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService @@ -28,7 +26,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.time.Duration open class DefaultMessagePrioritizeProcessor( private val messagePrioritizationStateService: MessagePrioritizationStateService, @@ -37,15 +34,11 @@ open class DefaultMessagePrioritizeProcessor( private val log = logger(DefaultMessagePrioritizeProcessor::class) - lateinit var expiryCancellable: Cancellable - lateinit var cleanCancellable: Cancellable - override suspend fun processNB(key: ByteArray, value: ByteArray) { val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) ?: throw BluePrintProcessorException("failed to convert") try { - messagePrioritizationService.setKafkaProcessorContext(processorContext) messagePrioritizationService.prioritize(messagePrioritize) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" @@ -65,10 +58,8 @@ open class DefaultMessagePrioritizeProcessor( override fun init(context: ProcessorContext) { super.init(context) - /** set up expiry marking cron */ - initializeExpiryPunctuator() - /** Set up cleaning records cron */ - initializeCleanPunctuator() + /** Set Configuration and Processor Context to messagePrioritizationService */ + messagePrioritizationService.setKafkaProcessorContext(processorContext) } override fun close() { @@ -76,40 +67,5 @@ open class DefaultMessagePrioritizeProcessor( "closing prioritization processor applicationId(${processorContext.applicationId()}), " + "taskId(${processorContext.taskId()})" ) - expiryCancellable.cancel() - cleanCancellable.cancel() - } - - open fun initializeExpiryPunctuator() { - val expiryPunctuator = - MessagePriorityExpiryPunctuator( - messagePrioritizationStateService - ) - expiryPunctuator.processorContext = processorContext - expiryPunctuator.configuration = prioritizationConfiguration - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - expiryCancellable = processorContext.schedule( - Duration.ofMillis(expiryConfiguration.frequencyMilli), - PunctuationType.WALL_CLOCK_TIME, expiryPunctuator - ) - log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") - } - - open fun initializeCleanPunctuator() { - val cleanPunctuator = - MessagePriorityCleanPunctuator( - messagePrioritizationStateService - ) - cleanPunctuator.processorContext = processorContext - cleanPunctuator.configuration = prioritizationConfiguration - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - cleanCancellable = processorContext.schedule( - Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), - PunctuationType.WALL_CLOCK_TIME, cleanPunctuator - ) - log.info( - "Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days" - ) } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt index d7666a20b..fb7cfd110 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList @@ -46,6 +47,9 @@ open class MessagePrioritizationConsumer( KafkaStreamConsumerFunction { return object : KafkaStreamConsumerFunction { + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + override suspend fun createTopology( messageConsumerProperties: MessageConsumerProperties, additionalConfig: Map<String, Any>? @@ -72,7 +76,7 @@ open class MessagePrioritizationConsumer( /** To receive completed and error messages */ topology.addSink( MessagePrioritizationConstants.SINK_OUTPUT, - prioritizationConfiguration.outputTopic, + kafkaConsumerConfiguration.outputTopic, Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), MessagePrioritizationConstants.PROCESSOR_PRIORITIZE ) @@ -84,7 +88,11 @@ open class MessagePrioritizationConsumer( } suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector) + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) // Dynamic Consumer Function to create Topology val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt deleted file mode 100644 index e27cf16d0..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractBluePrintMessagePunctuator() { - - private val log = logger(MessagePriorityExpiryPunctuator::class) - lateinit var configuration: PrioritizationConfiguration - - override suspend fun punctuateNB(timestamp: Long) { - - log.info( - "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - val expiryConfiguration = configuration.expiryConfiguration - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - - val expiredIds = fetchMessages?.map { it.id } - if (expiredIds != null && expiredIds.isNotEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expired -> - processorContext.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - } -} - -class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractBluePrintMessagePunctuator() { - - private val log = logger(MessagePriorityCleanPunctuator::class) - lateinit var configuration: PrioritizationConfiguration - - override suspend fun punctuateNB(timestamp: Long) { - log.info( - "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - // TODO - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 13c0dd7bc..931403200 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -18,11 +18,14 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.apache.kafka.streams.processor.ProcessorContext import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -49,7 +52,7 @@ abstract class AbstractMessagePrioritizationService( messagePrioritizationStateService.saveMessage(messagePrioritize) handleCorrelationAndNextStep(messagePrioritize) /** Cluster unLock for message group */ - MessageProcessorUtils.prioritizationGroupUnLock(clusterLock) + MessageProcessorUtils.prioritizationUnLock(clusterLock) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -61,12 +64,50 @@ abstract class AbstractMessagePrioritizationService( } } - override suspend fun output(id: String) { - log.info("$$$$$ received in output processor id($id)") - val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name) - /** Check for Kafka Processing, If yes, then send to the output topic */ - if (this.processorContext != null) { - processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + override suspend fun output(messages: List<MessagePrioritization>) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + messages.forEach { message -> + val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + /** Check for Kafka Processing, If yes, then send to the output topic */ + if (this.processorContext != null) { + processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } + } + } + + override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) { + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + if (processorContext != null) { + fetchMessages.forEach { expired -> + expired.state = MessageState.EXPIRED.name + processorContext!!.forward( + expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } + + override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) { + val clusterLock = MessageProcessorUtils.prioritizationCleanLock() + try { + messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) + } catch (e: Exception) { + log.error("failed in clean expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) } } @@ -78,7 +119,8 @@ abstract class AbstractMessagePrioritizationService( val correlationId = messagePrioritization.correlationId!! val types = getGroupCorrelationTypes(messagePrioritization) log.info( - "checking correlation for message($id), group($group), types($types), " + + "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " + + "correlation types($types), priority(${messagePrioritization.priority}), " + "correlation id($correlationId)" ) @@ -96,57 +138,50 @@ abstract class AbstractMessagePrioritizationService( .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) if (correlationResults.correlated) { - /** Correlation satisfied */ - val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id } - /** Send only correlated ids to aggregate processor */ - aggregate(correlatedIds) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.PRIORITIZED.name + ) + /** Correlation satisfied, Send only correlated messages to aggregate processor */ + aggregate(waitingCorrelatedStoreMessages) } else { /** Correlation not satisfied */ log.trace("correlation not matched : ${correlationResults.message}") - val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id } // Update the Message state to Wait - messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name) + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.WAIT.name + ) } } else { /** received first message of group and correlation Id, update the message with wait state */ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) } } else { - // No Correlation check needed, simply forward to next processor. + /** No Correlation check needed, simply forward to next processor. */ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - aggregate(messagePrioritization.id) + aggregate(arrayListOf(messagePrioritization)) } } - open suspend fun aggregate(strIds: String) { - log.info("@@@@@ received in aggregation processor ids($strIds)") - val ids = strIds.split(",").map { it.trim() } - if (!ids.isNullOrEmpty()) { + open suspend fun aggregate(messages: List<MessagePrioritization>) { + log.info("@@@@@ received in aggregation processor ids(${messages.ids()}") + if (!messages.isNullOrEmpty()) { try { - if (ids.size == 1) { - /** No aggregation or sequencing needed, simpley forward to next processor */ - output(ids.first()) - } else { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(ids) - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) - } + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(messages) } catch (e: Exception) { - val error = "failed in Aggregate message($ids) : ${e.message}" - log.error(error, e) - val storeMessages = messagePrioritizationStateService.getMessages(ids) - if (!storeMessages.isNullOrEmpty()) { - storeMessages.forEach { messagePrioritization -> + val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" + if (!messages.isNullOrEmpty()) { + messages.forEach { messagePrioritization -> try { /** Update the data store */ messagePrioritizationStateService.setMessageStateANdError( messagePrioritization.id, MessageState.ERROR.name, error ) - /** Publish to output topic */ - output(messagePrioritization.id) } catch (sendException: Exception) { log.error( "failed to update/publish error message(${messagePrioritization.id}) : " + @@ -154,6 +189,8 @@ abstract class AbstractMessagePrioritizationService( ) } } + /** Publish to output topic */ + output(messages) } } } @@ -162,7 +199,7 @@ abstract class AbstractMessagePrioritizationService( /** Child will override this implementation , if necessary * Here the place child has to implement custom Sequencing and Aggregation logic. * */ - abstract suspend fun handleAggregation(messageIds: List<String>) + abstract suspend fun handleAggregation(messages: List<MessagePrioritization>) /** If consumer wants specific correlation with respect to group and types, then populate the specific types, * otherwise correlation happens with group and correlationId */ diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt new file mode 100644 index 000000000..b1c1fb15f --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -0,0 +1,94 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service + +@Service +open class MessagePrioritizationSchedulerService( + private val messagePrioritizationService: MessagePrioritizationService +) { + private val log = logger(MessagePrioritizationSchedulerService::class) + + @Volatile + var keepGoing = true + + /** This is sample scheduler implementation used during starting application with configuration. + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + */ + + open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + log.info("Starting Prioritization Scheduler Service...") + GlobalScope.launch { + expiryScheduler(prioritizationConfiguration) + } + GlobalScope.launch { + cleanUpScheduler(prioritizationConfiguration) + } + } + + open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) { + keepGoing = false + delay(prioritizationConfiguration.shutDownConfiguration.waitMill) + } + + private suspend fun expiryScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.updateExpiredMessages(expiryConfiguration) + delay(expiryConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization expiry scheduler", e) + } + } + } + } + + private suspend fun cleanUpScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.cleanExpiredMessage(cleanConfiguration) + delay(cleanConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization clean scheduler", e) + } + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt index d9cd956bf..dde8d95e0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -23,6 +23,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate import org.springframework.data.domain.PageRequest import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional @@ -77,9 +79,15 @@ open class MessagePrioritizationStateServiceImpl( ) } + override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? { + return prioritizationMessageRepository.findByExpiredDate( + expiryDate, PageRequest.of(0, count) + ) + } + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository.findByByGroupAndExpiredDate( + return prioritizationMessageRepository.findByGroupAndExpiredDate( group, expiryDate, PageRequest.of(0, count) ) @@ -142,21 +150,27 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun deleteMessage(id: String) { - return prioritizationMessageRepository.deleteById(id) + prioritizationMessageRepository.deleteById(id) + log.info("Prioritization Messages $id deleted successfully.") } - override suspend fun deleteMessageByGroup(group: String) { - return prioritizationMessageRepository.deleteGroup(group) + override suspend fun deleteMessages(ids: List<String>) { + prioritizationMessageRepository.deleteByIds(ids) + log.info("Prioritization Messages $ids deleted successfully.") } - override suspend fun deleteMessageStates(group: String, states: List<String>) { - return prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + override suspend fun deleteExpiredMessage(retentionDays: Int) { + val expiryCheckDate = controllerDate().addDate(retentionDays) + prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate) } - override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { - return prioritizationMessageRepository.deleteGroupAndStateIn( - group, - arrayListOf(MessageState.EXPIRED.name) - ) + override suspend fun deleteMessageByGroup(group: String) { + prioritizationMessageRepository.deleteGroup(group) + log.info("Prioritization Messages group($group) deleted successfully.") + } + + override suspend fun deleteMessageStates(group: String, states: List<String>) { + prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + log.info("Prioritization Messages group($group) with states($states) deleted successfully.") } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt index fcdb71cda..b7d878e4a 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -17,21 +17,27 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority import org.onap.ccsdk.cds.controllerblueprints.core.logger -open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : +/** Sample Prioritization Service, Define spring service injector to register in application*/ +open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { private val log = logger(DefaultMessagePrioritizeProcessor::class) /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messageIds: List<String>) { - log.info("messages($messageIds) aggregated") - messageIds.forEach { id -> - output(id) - } + override suspend fun handleAggregation(messages: List<MessagePrioritization>) { + log.info("messages(${messages.ids()}) aggregated") + /** Sequence based on Priority and Updated Date */ + val sequencedMessage = messages.orderByHighestPriority() + /** Update all messages to aggregated state */ + messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name) + output(sequencedMessage) } /** If consumer wants specific correlation with respect to group and types, then populate the specific types, diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index 4a36a40f3..e497ef144 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization @@ -29,9 +30,11 @@ object MessagePrioritizationSample { fun samplePrioritizationConfiguration(): PrioritizationConfiguration { return PrioritizationConfiguration().apply { - inputTopicSelector = "prioritize-input" - outputTopic = "prioritize-output-topic" - expiredTopic = "prioritize-expired-topic" + kafkaConfiguration = KafkaConfiguration().apply { + inputTopicSelector = "prioritize-input" + outputTopic = "prioritize-output-topic" + expiredTopic = "prioritize-expired-topic" + } expiryConfiguration = ExpiryConfiguration().apply { frequencyMilli = 10000L maxPollRecord = 2000 @@ -46,6 +49,22 @@ object MessagePrioritizationSample { } } + fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 20L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10L + expiredRecordsHoldDays = 5 + } + } + } + private fun currentDatePlusDays(days: Int): Date { val calender = Calendar.getInstance() calender.add(Calendar.DATE, days) @@ -68,7 +87,11 @@ object MessagePrioritizationSample { return messages } - fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { + fun sampleMessageWithSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List<MessagePrioritization> { val messages: MutableList<MessagePrioritization> = arrayListOf() repeat(count) { val backPressureMessage = createMessage( @@ -108,7 +131,7 @@ object MessagePrioritizationSample { group = groupName type = messageType state = messageState - priority = 5 + priority = (1..10).shuffled().first() correlationId = messageCorrelationId message = "I am the Message" createdDate = Date() diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 186499d66..18b3e4dd7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -45,8 +45,32 @@ object MessageProcessorUtils { } else null } + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationExpiryLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-expiry" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationCleanLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-clean" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + /** Utility used to cluster unlock for message [clusterLock] */ - suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) { + suspend fun prioritizationUnLock(clusterLock: ClusterLock?) { if (clusterLock != null) { clusterLock.unLock() clusterLock.close() diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index ec0515c42..190f4e891 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -28,6 +28,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService @@ -89,6 +90,9 @@ open class MessagePrioritizationConsumerTest { lateinit var messagePrioritizationService: MessagePrioritizationService @Autowired + lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService + + @Autowired lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer @Before @@ -151,7 +155,7 @@ open class MessagePrioritizationConsumerTest { val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() val streamingConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(configuration.inputTopicSelector) + .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector) assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") val spyStreamingConsumerService = spyk(streamingConsumerService) @@ -176,6 +180,25 @@ open class MessagePrioritizationConsumerTest { } } + @Test + fun testSchedulerService() { + runBlocking { + val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration() + assertTrue( + ::messagePrioritizationSchedulerService.isInitialized, + "failed to initialize messagePrioritizationSchedulerService" + ) + launch { + messagePrioritizationSchedulerService.startScheduling(configuration) + } + launch { + /** To debug increase the delay time */ + delay(20) + messagePrioritizationSchedulerService.shutdownScheduling(configuration) + } + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ // @Test fun testMessagePrioritizationConsumer() { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt index 3876cbba5..73d3738e5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -19,6 +19,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization. import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority +import kotlin.test.assertNotNull import kotlin.test.assertTrue class MessageCorrelationUtilsTest { @@ -59,10 +61,11 @@ class MessageCorrelationUtilsTest { /* Assumption is Same group with different types and one missing expected types, In this case type-3 message is missing */ - val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2", "type-3") - ) + val differentTypesWithSameCorrelationMessagesResWithMissingType = + MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3") + ) assertTrue( !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" @@ -118,4 +121,12 @@ class MessageCorrelationUtilsTest { "failed to correlate differentTypesWithDifferentCorrelationMessageResp" ) } + + @Test + fun testPrioritizationOrdering() { + val differentPriorityMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5) + val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority() + assertNotNull(orderedPriorityMessages, "failed to order the priority messages") + } } |