aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt4
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt9
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt15
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt53
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt48
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt12
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt69
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt113
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt94
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt36
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt18
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt33
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt26
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt25
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt19
16 files changed, 377 insertions, 205 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 3ecfa27e0..8345df523 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -36,6 +36,10 @@ open class PrioritizationConfiguration : Serializable {
lateinit var expiryConfiguration: ExpiryConfiguration
lateinit var shutDownConfiguration: ShutDownConfiguration
lateinit var cleanConfiguration: CleanConfiguration
+ var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+}
+
+open class KafkaConfiguration : Serializable {
lateinit var inputTopicSelector: String // Consumer Configuration Selector
lateinit var expiredTopic: String // Publish Configuration Selector
lateinit var outputTopic: String // Publish Configuration Selector
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
index 584fd00d3..464f97a88 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -25,5 +25,12 @@ interface MessagePrioritizationService {
suspend fun prioritize(messagePrioritization: MessagePrioritization)
- suspend fun output(id: String)
+ /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
+ suspend fun output(messages: List<MessagePrioritization>)
+
+ /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
+ suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+
+ /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
+ suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
index 5dd41d7f3..2e5e6c617 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
@@ -35,6 +35,8 @@ interface MessagePrioritizationStateService {
suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
List<MessagePrioritization>?
+ suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>?
+
suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
suspend fun getCorrelatedMessages(
@@ -60,9 +62,11 @@ interface MessagePrioritizationStateService {
suspend fun deleteMessage(id: String)
+ suspend fun deleteMessages(id: List<String>)
+
+ suspend fun deleteExpiredMessage(retentionDays: Int)
+
suspend fun deleteMessageByGroup(group: String)
suspend fun deleteMessageStates(group: String, states: List<String>)
-
- suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 05b820adb..d8e71d413 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -50,3 +50,18 @@ fun MessagePrioritization.toFormatedCorrelation(): String {
fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
}
+
+/** get list of message ids **/
+fun List<MessagePrioritization>.ids(): List<String> {
+ return this.map { it.id }
+}
+
+/** Ordered by highest priority and updated date **/
+fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> {
+ return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate))
+}
+
+/** Ordered by Updated date **/
+fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> {
+ return this.sortedWith(compareBy(MessagePrioritization::updatedDate))
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
index b0514838a..0b35e3856 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
@@ -33,20 +33,20 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.createdDate asc"
+ "ORDER BY pm.createdDate asc"
)
fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.updatedDate asc"
+ "ORDER BY pm.updatedDate asc"
)
fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable):
- List<MessagePrioritization>?
+ List<MessagePrioritization>?
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
+ "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
)
fun findByGroupAndStateInAndNotExpiredDate(
group: String,
@@ -57,7 +57,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Query(
"FROM MessagePrioritization pm WHERE pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
)
fun findByStateInAndExpiredDate(
states: List<String>,
@@ -67,7 +67,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
)
fun findByGroupAndStateInAndExpiredDate(
group: String,
@@ -78,20 +78,25 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
)
- fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+ fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
)
fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String):
- List<MessagePrioritization>?
+ List<MessagePrioritization>?
@Query(
"FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
)
fun findByGroupAndTypesAndCorrelationId(
group: String,
@@ -104,7 +109,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id = :id"
+ "WHERE id = :id"
)
fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
@@ -112,7 +117,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id = :id"
+ "WHERE id = :id"
)
fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
@@ -120,7 +125,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id IN :ids"
+ "WHERE id IN :ids"
)
fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
@@ -128,7 +133,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id IN :ids"
+ "WHERE id IN :ids"
)
fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
@@ -136,7 +141,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
- "WHERE id = :id"
+ "WHERE id = :id"
)
fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
@@ -144,17 +149,27 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
@Transactional
@Query(
"UPDATE MessagePrioritization SET state = :state, " +
- "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
+ "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
)
fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
@Modifying
@Transactional
- @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
+ @Query("DELETE FROM MessagePrioritization WHERE id IN :ids")
+ fun deleteByIds(ids: List<String>)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ")
+ fun deleteByExpiryDate(expiryCheckDate: Date)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE group = :group")
fun deleteGroup(group: String)
@Modifying
@Transactional
- @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
+ @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states")
fun deleteGroupAndStateIn(group: String, states: List<String>)
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
index c14a404ad..624a69fd4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -16,9 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
@@ -28,7 +26,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
open class DefaultMessagePrioritizeProcessor(
private val messagePrioritizationStateService: MessagePrioritizationStateService,
@@ -37,15 +34,11 @@ open class DefaultMessagePrioritizeProcessor(
private val log = logger(DefaultMessagePrioritizeProcessor::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
-
override suspend fun processNB(key: ByteArray, value: ByteArray) {
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
- messagePrioritizationService.setKafkaProcessorContext(processorContext)
messagePrioritizationService.prioritize(messagePrioritize)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
@@ -65,10 +58,8 @@ open class DefaultMessagePrioritizeProcessor(
override fun init(context: ProcessorContext) {
super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
+ /** Set Configuration and Processor Context to messagePrioritizationService */
+ messagePrioritizationService.setKafkaProcessorContext(processorContext)
}
override fun close() {
@@ -76,40 +67,5 @@ open class DefaultMessagePrioritizeProcessor(
"closing prioritization processor applicationId(${processorContext.applicationId()}), " +
"taskId(${processorContext.taskId()})"
)
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator =
- MessagePriorityExpiryPunctuator(
- messagePrioritizationStateService
- )
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator =
- MessagePriorityCleanPunctuator(
- messagePrioritizationStateService
- )
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
index d7666a20b..fb7cfd110 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
@@ -46,6 +47,9 @@ open class MessagePrioritizationConsumer(
KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
override suspend fun createTopology(
messageConsumerProperties: MessageConsumerProperties,
additionalConfig: Map<String, Any>?
@@ -72,7 +76,7 @@ open class MessagePrioritizationConsumer(
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
- prioritizationConfiguration.outputTopic,
+ kafkaConsumerConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
@@ -84,7 +88,11 @@ open class MessagePrioritizationConsumer(
}
suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
- streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+ streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
// Dynamic Consumer Function to create Topology
val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
deleted file mode 100644
index e27cf16d0..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
- AbstractBluePrintMessagePunctuator() {
-
- private val log = logger(MessagePriorityExpiryPunctuator::class)
- lateinit var configuration: PrioritizationConfiguration
-
- override suspend fun punctuateNB(timestamp: Long) {
-
- log.info(
- "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- val expiryConfiguration = configuration.expiryConfiguration
- val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
-
- val expiredIds = fetchMessages?.map { it.id }
- if (expiredIds != null && expiredIds.isNotEmpty()) {
- messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- fetchMessages.forEach { expired ->
- processorContext.forward(
- expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
- }
-}
-
-class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
- AbstractBluePrintMessagePunctuator() {
-
- private val log = logger(MessagePriorityCleanPunctuator::class)
- lateinit var configuration: PrioritizationConfiguration
-
- override suspend fun punctuateNB(timestamp: Long) {
- log.info(
- "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- // TODO
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 13c0dd7bc..931403200 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -18,11 +18,14 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -49,7 +52,7 @@ abstract class AbstractMessagePrioritizationService(
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -61,12 +64,50 @@ abstract class AbstractMessagePrioritizationService(
}
}
- override suspend fun output(id: String) {
- log.info("$$$$$ received in output processor id($id)")
- val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ messages.forEach { message ->
+ val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
+ }
+ }
+
+ override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ if (processorContext != null) {
+ fetchMessages.forEach { expired ->
+ expired.state = MessageState.EXPIRED.name
+ processorContext!!.forward(
+ expired.id, expired,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+
+ override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
+ try {
+ messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
+ } catch (e: Exception) {
+ log.error("failed in clean expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
}
}
@@ -78,7 +119,8 @@ abstract class AbstractMessagePrioritizationService(
val correlationId = messagePrioritization.correlationId!!
val types = getGroupCorrelationTypes(messagePrioritization)
log.info(
- "checking correlation for message($id), group($group), types($types), " +
+ "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
+ "correlation types($types), priority(${messagePrioritization.priority}), " +
"correlation id($correlationId)"
)
@@ -96,57 +138,50 @@ abstract class AbstractMessagePrioritizationService(
.correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
if (correlationResults.correlated) {
- /** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
- /** Send only correlated ids to aggregate processor */
- aggregate(correlatedIds)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.PRIORITIZED.name
+ )
+ /** Correlation satisfied, Send only correlated messages to aggregate processor */
+ aggregate(waitingCorrelatedStoreMessages)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
- val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
// Update the Message state to Wait
- messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.WAIT.name
+ )
}
} else {
/** received first message of group and correlation Id, update the message with wait state */
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
}
} else {
- // No Correlation check needed, simply forward to next processor.
+ /** No Correlation check needed, simply forward to next processor. */
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- aggregate(messagePrioritization.id)
+ aggregate(arrayListOf(messagePrioritization))
}
}
- open suspend fun aggregate(strIds: String) {
- log.info("@@@@@ received in aggregation processor ids($strIds)")
- val ids = strIds.split(",").map { it.trim() }
- if (!ids.isNullOrEmpty()) {
+ open suspend fun aggregate(messages: List<MessagePrioritization>) {
+ log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
+ if (!messages.isNullOrEmpty()) {
try {
- if (ids.size == 1) {
- /** No aggregation or sequencing needed, simpley forward to next processor */
- output(ids.first())
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
- }
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(messages)
} catch (e: Exception) {
- val error = "failed in Aggregate message($ids) : ${e.message}"
- log.error(error, e)
- val storeMessages = messagePrioritizationStateService.getMessages(ids)
- if (!storeMessages.isNullOrEmpty()) {
- storeMessages.forEach { messagePrioritization ->
+ val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
+ if (!messages.isNullOrEmpty()) {
+ messages.forEach { messagePrioritization ->
try {
/** Update the data store */
messagePrioritizationStateService.setMessageStateANdError(
messagePrioritization.id,
MessageState.ERROR.name, error
)
- /** Publish to output topic */
- output(messagePrioritization.id)
} catch (sendException: Exception) {
log.error(
"failed to update/publish error message(${messagePrioritization.id}) : " +
@@ -154,6 +189,8 @@ abstract class AbstractMessagePrioritizationService(
)
}
}
+ /** Publish to output topic */
+ output(messages)
}
}
}
@@ -162,7 +199,7 @@ abstract class AbstractMessagePrioritizationService(
/** Child will override this implementation , if necessary
* Here the place child has to implement custom Sequencing and Aggregation logic.
* */
- abstract suspend fun handleAggregation(messageIds: List<String>)
+ abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
new file mode 100644
index 000000000..b1c1fb15f
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -0,0 +1,94 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+
+@Service
+open class MessagePrioritizationSchedulerService(
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
+ private val log = logger(MessagePrioritizationSchedulerService::class)
+
+ @Volatile
+ var keepGoing = true
+
+ /** This is sample scheduler implementation used during starting application with configuration.
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+ */
+
+ open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+ log.info("Starting Prioritization Scheduler Service...")
+ GlobalScope.launch {
+ expiryScheduler(prioritizationConfiguration)
+ }
+ GlobalScope.launch {
+ cleanUpScheduler(prioritizationConfiguration)
+ }
+ }
+
+ open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+ keepGoing = false
+ delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
+ }
+
+ private suspend fun expiryScheduler(
+ prioritizationConfiguration: PrioritizationConfiguration
+ ) {
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
+ withContext(Dispatchers.Default) {
+ while (keepGoing) {
+ try {
+ messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+ delay(expiryConfiguration.frequencyMilli)
+ } catch (e: Exception) {
+ log.error("failed in prioritization expiry scheduler", e)
+ }
+ }
+ }
+ }
+
+ private suspend fun cleanUpScheduler(
+ prioritizationConfiguration: PrioritizationConfiguration
+ ) {
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
+ withContext(Dispatchers.Default) {
+ while (keepGoing) {
+ try {
+ messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+ delay(cleanConfiguration.frequencyMilli)
+ } catch (e: Exception) {
+ log.error("failed in prioritization clean scheduler", e)
+ }
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
index d9cd956bf..dde8d95e0 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
@@ -23,6 +23,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
import org.springframework.data.domain.PageRequest
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
@@ -77,9 +79,15 @@ open class MessagePrioritizationStateServiceImpl(
)
}
+ override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByExpiredDate(
+ expiryDate, PageRequest.of(0, count)
+ )
+ }
+
override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByByGroupAndExpiredDate(
+ return prioritizationMessageRepository.findByGroupAndExpiredDate(
group,
expiryDate, PageRequest.of(0, count)
)
@@ -142,21 +150,27 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun deleteMessage(id: String) {
- return prioritizationMessageRepository.deleteById(id)
+ prioritizationMessageRepository.deleteById(id)
+ log.info("Prioritization Messages $id deleted successfully.")
}
- override suspend fun deleteMessageByGroup(group: String) {
- return prioritizationMessageRepository.deleteGroup(group)
+ override suspend fun deleteMessages(ids: List<String>) {
+ prioritizationMessageRepository.deleteByIds(ids)
+ log.info("Prioritization Messages $ids deleted successfully.")
}
- override suspend fun deleteMessageStates(group: String, states: List<String>) {
- return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+ override suspend fun deleteExpiredMessage(retentionDays: Int) {
+ val expiryCheckDate = controllerDate().addDate(retentionDays)
+ prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate)
}
- override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
- return prioritizationMessageRepository.deleteGroupAndStateIn(
- group,
- arrayListOf(MessageState.EXPIRED.name)
- )
+ override suspend fun deleteMessageByGroup(group: String) {
+ prioritizationMessageRepository.deleteGroup(group)
+ log.info("Prioritization Messages group($group) deleted successfully.")
+ }
+
+ override suspend fun deleteMessageStates(group: String, states: List<String>) {
+ prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+ log.info("Prioritization Messages group($group) with states($states) deleted successfully.")
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
index fcdb71cda..b7d878e4a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -17,21 +17,27 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+/** Sample Prioritization Service, Define spring service injector to register in application*/
+open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
AbstractMessagePrioritizationService(messagePrioritizationStateService) {
private val log = logger(DefaultMessagePrioritizeProcessor::class)
/** Child overriding this implementation , if necessary */
- override suspend fun handleAggregation(messageIds: List<String>) {
- log.info("messages($messageIds) aggregated")
- messageIds.forEach { id ->
- output(id)
- }
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ log.info("messages(${messages.ids()}) aggregated")
+ /** Sequence based on Priority and Updated Date */
+ val sequencedMessage = messages.orderByHighestPriority()
+ /** Update all messages to aggregated state */
+ messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
+ output(sequencedMessage)
}
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
index 4a36a40f3..e497ef144 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
@@ -29,9 +30,11 @@ object MessagePrioritizationSample {
fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
return PrioritizationConfiguration().apply {
- inputTopicSelector = "prioritize-input"
- outputTopic = "prioritize-output-topic"
- expiredTopic = "prioritize-expired-topic"
+ kafkaConfiguration = KafkaConfiguration().apply {
+ inputTopicSelector = "prioritize-input"
+ outputTopic = "prioritize-output-topic"
+ expiredTopic = "prioritize-expired-topic"
+ }
expiryConfiguration = ExpiryConfiguration().apply {
frequencyMilli = 10000L
maxPollRecord = 2000
@@ -46,6 +49,22 @@ object MessagePrioritizationSample {
}
}
+ fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration {
+ return PrioritizationConfiguration().apply {
+ expiryConfiguration = ExpiryConfiguration().apply {
+ frequencyMilli = 10L
+ maxPollRecord = 2000
+ }
+ shutDownConfiguration = ShutDownConfiguration().apply {
+ waitMill = 20L
+ }
+ cleanConfiguration = CleanConfiguration().apply {
+ frequencyMilli = 10L
+ expiredRecordsHoldDays = 5
+ }
+ }
+ }
+
private fun currentDatePlusDays(days: Int): Date {
val calender = Calendar.getInstance()
calender.add(Calendar.DATE, days)
@@ -68,7 +87,11 @@ object MessagePrioritizationSample {
return messages
}
- fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+ fun sampleMessageWithSameCorrelation(
+ groupName: String,
+ messageState: String,
+ count: Int
+ ): List<MessagePrioritization> {
val messages: MutableList<MessagePrioritization> = arrayListOf()
repeat(count) {
val backPressureMessage = createMessage(
@@ -108,7 +131,7 @@ object MessagePrioritizationSample {
group = groupName
type = messageType
state = messageState
- priority = 5
+ priority = (1..10).shuffled().first()
correlationId = messageCorrelationId
message = "I am the Message"
createdDate = Date()
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 186499d66..18b3e4dd7 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -45,8 +45,32 @@ object MessageProcessorUtils {
} else null
}
+ /** Utility to create the cluster lock for expiry scheduler*/
+ suspend fun prioritizationExpiryLock(): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritize-expiry"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility to create the cluster lock for expiry scheduler*/
+ suspend fun prioritizationCleanLock(): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritize-clean"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
/** Utility used to cluster unlock for message [clusterLock] */
- suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+ suspend fun prioritizationUnLock(clusterLock: ClusterLock?) {
if (clusterLock != null) {
clusterLock.unLock()
clusterLock.close()
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index ec0515c42..190f4e891 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -28,6 +28,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
@@ -89,6 +90,9 @@ open class MessagePrioritizationConsumerTest {
lateinit var messagePrioritizationService: MessagePrioritizationService
@Autowired
+ lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+
+ @Autowired
lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
@Before
@@ -151,7 +155,7 @@ open class MessagePrioritizationConsumerTest {
val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.inputTopicSelector)
+ .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
val spyStreamingConsumerService = spyk(streamingConsumerService)
@@ -176,6 +180,25 @@ open class MessagePrioritizationConsumerTest {
}
}
+ @Test
+ fun testSchedulerService() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
+ assertTrue(
+ ::messagePrioritizationSchedulerService.isInitialized,
+ "failed to initialize messagePrioritizationSchedulerService"
+ )
+ launch {
+ messagePrioritizationSchedulerService.startScheduling(configuration)
+ }
+ launch {
+ /** To debug increase the delay time */
+ delay(20)
+ messagePrioritizationSchedulerService.shutdownScheduling(configuration)
+ }
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testMessagePrioritizationConsumer() {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
index 3876cbba5..73d3738e5 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -19,6 +19,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
+import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class MessageCorrelationUtilsTest {
@@ -59,10 +61,11 @@ class MessageCorrelationUtilsTest {
/* Assumption is Same group with different types and one missing expected types,
In this case type-3 message is missing */
- val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithSameCorrelationMessages,
- arrayListOf("type-0", "type-1", "type-2", "type-3")
- )
+ val differentTypesWithSameCorrelationMessagesResWithMissingType =
+ MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3")
+ )
assertTrue(
!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
"failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
@@ -118,4 +121,12 @@ class MessageCorrelationUtilsTest {
"failed to correlate differentTypesWithDifferentCorrelationMessageResp"
)
}
+
+ @Test
+ fun testPrioritizationOrdering() {
+ val differentPriorityMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5)
+ val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority()
+ assertNotNull(orderedPriorityMessages, "failed to order the priority messages")
+ }
}