summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritization
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritization')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/README.md26
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/pom.xml43
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt33
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt89
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt37
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt72
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt67
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt80
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt89
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt175
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt84
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt28
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt78
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt108
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt64
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt85
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt92
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt203
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt98
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt176
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt120
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt148
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt86
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt350
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt65
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt132
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml42
28 files changed, 2752 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/README.md b/ms/blueprintsprocessor/functions/message-prioritization/README.md
new file mode 100644
index 000000000..cda43faca
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/README.md
@@ -0,0 +1,26 @@
+
+To Delete Topics
+------------------
+kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
+kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
+kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
+
+Create Topics
+--------------
+
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
+
+To List topics
+----------------
+kafka-topics --list --bootstrap-server localhost:9092
+
+To publish message
+--------------------
+kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic
+
+To Listen for Output
+----------------------
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
+
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/pom.xml b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml
new file mode 100644
index 000000000..9b6f3b1c3
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>blueprintsprocessor-functions</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
+ <artifactId>message-prioritization</artifactId>
+
+ <name>MS Blueprints Processor Functions - Message Prioritization</name>
+ <description>Blueprints Processor Function - Message Prioritization</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor.modules</groupId>
+ <artifactId>message-lib</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
new file mode 100644
index 000000000..890e0a6ba
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+open class MessagePrioritizationConfiguration
+
+object MessagePrioritizationConstants {
+
+ const val SOURCE_INPUT = "source-prioritization-input"
+
+ const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
+
+ const val SINK_OUTPUT = "sink-prioritization-output"
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
new file mode 100644
index 000000000..65b7644a8
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import java.io.Serializable
+
+object MessageActionConstants {
+
+ const val PRIORITIZE = "prioritize"
+}
+
+enum class MessageState(val id: String) {
+ NEW("new"),
+ WAIT("wait"),
+ EXPIRED("expired"),
+ PRIORITIZED("prioritized"),
+ AGGREGATED("aggregated"),
+ COMPLETED("completed"),
+ ERROR("error")
+}
+
+open class PrioritizationConfiguration : Serializable {
+
+ lateinit var expiryConfiguration: ExpiryConfiguration
+ lateinit var shutDownConfiguration: ShutDownConfiguration
+ lateinit var cleanConfiguration: CleanConfiguration
+ var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+ var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
+}
+
+open class KafkaConfiguration : Serializable {
+
+ lateinit var inputTopicSelector: String // Consumer Configuration Selector
+ lateinit var expiredTopic: String // Publish Configuration Selector
+ lateinit var outputTopic: String // Publish Configuration Selector
+}
+
+open class NatsConfiguration : Serializable {
+
+ lateinit var connectionSelector: String // Consumer Configuration Selector
+ lateinit var inputSubject: String // Publish Configuration Selector
+ lateinit var expiredSubject: String // Publish Configuration Selector
+ lateinit var outputSubject: String // Publish Configuration Selector
+}
+
+open class ExpiryConfiguration : Serializable {
+
+ var frequencyMilli: Long = 30000L
+ var maxPollRecord: Int = 1000
+}
+
+open class ShutDownConfiguration : Serializable {
+
+ var waitMill: Long = 30000L
+}
+
+open class CleanConfiguration : Serializable {
+
+ var frequencyMilli: Long = 30000L
+ var expiredRecordsHoldDays: Int = 5
+}
+
+open class UpdateStateRequest : Serializable {
+
+ lateinit var id: String
+ var group: String? = null
+ var state: String? = null
+}
+
+data class CorrelationCheckResponse(
+ var message: String? = null,
+ var correlated: Boolean = false
+)
+
+data class TypeCorrelationKey(val type: String, val correlationId: String)
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
new file mode 100644
index 000000000..dfe516953
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+
+interface MessagePrioritizationService {
+
+ fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
+
+ fun getConfiguration(): PrioritizationConfiguration
+
+ suspend fun prioritize(messagePrioritization: MessagePrioritization)
+
+ /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
+ suspend fun output(messages: List<MessagePrioritization>)
+
+ /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
+ suspend fun updateExpiredMessages()
+
+ /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
+ suspend fun cleanExpiredMessage()
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
new file mode 100644
index 000000000..2e5e6c617
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.Date
+
+interface MessagePrioritizationStateService {
+
+ suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+ suspend fun getMessage(id: String): MessagePrioritization
+
+ suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
+ suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>?
+
+ suspend fun updateMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+ suspend fun setMessageState(id: String, state: String)
+
+ suspend fun setMessagesPriority(ids: List<String>, priority: String)
+
+ suspend fun setMessagesState(ids: List<String>, state: String)
+
+ suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+ suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
+
+ suspend fun deleteMessage(id: String)
+
+ suspend fun deleteMessages(id: List<String>)
+
+ suspend fun deleteExpiredMessage(retentionDays: Int)
+
+ suspend fun deleteMessageByGroup(group: String)
+
+ suspend fun deleteMessageStates(group: String, states: List<String>)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
new file mode 100644
index 000000000..d8e71d413
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+/**
+ * Register the MessagePrioritizationStateService and exposed dependency
+ */
+fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService =
+ instance(MessagePrioritizationStateService::class)
+
+/**
+ * Expose messagePrioritizationStateService to AbstractComponentFunction
+ */
+fun AbstractComponentFunction.messagePrioritizationStateService() =
+ BluePrintDependencyService.messagePrioritizationStateService()
+
+/**
+ * MessagePrioritization correlation extensions
+ */
+
+/**
+ * Arrange comma separated correlation keys in ascending order.
+ */
+fun MessagePrioritization.toFormatedCorrelation(): String {
+ return this.correlationId!!.split(",")
+ .map { it.trim() }.sorted().joinToString(",")
+}
+
+/**
+ * Used to group the correlation with respect to types.
+ */
+fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
+ return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
+}
+
+/** get list of message ids **/
+fun List<MessagePrioritization>.ids(): List<String> {
+ return this.map { it.id }
+}
+
+/** Ordered by highest priority and updated date **/
+fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> {
+ return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate))
+}
+
+/** Ordered by Updated date **/
+fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> {
+ return this.sortedWith(compareBy(MessagePrioritization::updatedDate))
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
new file mode 100644
index 000000000..c7aab03b6
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -0,0 +1,80 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope
+import org.springframework.http.MediaType
+import org.springframework.web.bind.annotation.GetMapping
+import org.springframework.web.bind.annotation.PathVariable
+import org.springframework.web.bind.annotation.PostMapping
+import org.springframework.web.bind.annotation.RequestBody
+import org.springframework.web.bind.annotation.RequestMapping
+import org.springframework.web.bind.annotation.ResponseBody
+import org.springframework.web.bind.annotation.RestController
+
+@RestController
+@RequestMapping(value = ["/api/v1/message-prioritization"])
+open class MessagePrioritizationApi(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
+
+ @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
+ @ResponseBody
+ suspend fun ping(): String = mdcWebCoroutineScope { "Success" }
+
+ @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
+ @ResponseBody
+ suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope {
+ messagePrioritizationStateService.getMessage(id)
+ }
+
+ @PostMapping(
+ path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ @ResponseBody
+ suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) =
+ mdcWebCoroutineScope {
+ messagePrioritizationStateService.saveMessage(messagePrioritization)
+ }
+
+ @PostMapping(
+ path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ @ResponseBody
+ suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope {
+ messagePrioritizationService.prioritize(messagePrioritization)
+ }
+
+ @PostMapping(
+ path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) =
+ mdcWebCoroutineScope {
+ messagePrioritizationStateService.setMessageState(
+ updateMessageState.id,
+ updateMessageState.state!!
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
new file mode 100644
index 000000000..ce2085f68
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
@@ -0,0 +1,89 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import com.fasterxml.jackson.annotation.JsonFormat
+import org.hibernate.annotations.Proxy
+import org.springframework.data.annotation.LastModifiedDate
+import org.springframework.data.jpa.domain.support.AuditingEntityListener
+import org.springframework.data.jpa.repository.config.EnableJpaAuditing
+import java.util.Date
+import javax.persistence.Column
+import javax.persistence.Entity
+import javax.persistence.EntityListeners
+import javax.persistence.Id
+import javax.persistence.Lob
+import javax.persistence.Table
+import javax.persistence.Temporal
+import javax.persistence.TemporalType
+
+@EnableJpaAuditing
+@EntityListeners(AuditingEntityListener::class)
+@Entity
+@Table(name = "MESSAGE_PRIORITIZATION")
+@Proxy(lazy = false)
+open class MessagePrioritization {
+
+ @Id
+ @Column(name = "message_id", length = 50)
+ lateinit var id: String
+
+ @Column(name = "message_group", length = 50, nullable = false)
+ lateinit var group: String
+
+ @Column(name = "message_type", length = 50, nullable = false)
+ lateinit var type: String
+
+ /** States Defined by MessageState */
+ @Column(name = "message_state", length = 20, nullable = false)
+ lateinit var state: String
+
+ @Column(name = "priority", nullable = false)
+ var priority: Int = 5
+
+ @Lob
+ @Column(name = "message", nullable = false)
+ var message: String? = null
+
+ @Lob
+ @Column(name = "error", nullable = true)
+ var error: String? = null
+
+ @Lob
+ @Column(name = "aggregated_message_ids", nullable = true)
+ var aggregatedMessageIds: String? = null
+
+ @Lob
+ @Column(name = "correlation_id", nullable = true)
+ var correlationId: String? = null
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "created_date", nullable = false)
+ var createdDate = Date()
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @LastModifiedDate
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "updated_date", nullable = false)
+ var updatedDate: Date? = null
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "expiry_date", nullable = false)
+ var expiryDate: Date? = null
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
new file mode 100644
index 000000000..0b35e3856
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import org.springframework.data.domain.Pageable
+import org.springframework.data.jpa.repository.JpaRepository
+import org.springframework.data.jpa.repository.Modifying
+import org.springframework.data.jpa.repository.Query
+import org.springframework.stereotype.Repository
+import org.springframework.transaction.annotation.Transactional
+import java.util.Date
+
+@Repository
+@Transactional(readOnly = true)
+interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
+ fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.updatedDate asc"
+ )
+ fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable):
+ List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateInAndNotExpiredDate(
+ group: String,
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByStateInAndExpiredDate(
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateInAndExpiredDate(
+ group: String,
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String):
+ List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndTypesAndCorrelationId(
+ group: String,
+ states: List<String>,
+ types: List<String>,
+ correlationId: String
+ ): List<MessagePrioritization>?
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id IN :ids"
+ )
+ fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
+ "WHERE id IN :ids"
+ )
+ fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, " +
+ "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
+ )
+ fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE id IN :ids")
+ fun deleteByIds(ids: List<String>)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ")
+ fun deleteByExpiryDate(expiryCheckDate: Date)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE group = :group")
+ fun deleteGroup(group: String)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states")
+ fun deleteGroupAndStateIn(group: String, states: List<String>)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
new file mode 100644
index 000000000..112a80379
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractKafkaMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractKafkaMessagePrioritizationService::class)
+
+ lateinit var processorContext: ProcessorContext
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ processorContext.forward(
+ updatedMessage.id,
+ updatedMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ processorContext.forward(
+ expiredMessage.id, expiredMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
new file mode 100644
index 000000000..d4f8470c8
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+
+/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
+abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
+
+ override fun init(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
new file mode 100644
index 000000000..1b0612492
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -0,0 +1,78 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+
+open class DefaultMessagePrioritizeProcessor(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
+) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ override suspend fun processNB(key: ByteArray, value: ByteArray) {
+
+ val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ try {
+ kafkaMessagePrioritizationService.prioritize(messagePrioritize)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
+ /** Publish to Output topic */
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ super.init(context)
+ /** Set Configuration and Processor Context to messagePrioritizationService */
+ if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
+ kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
+ } else {
+ throw BluePrintProcessorException(
+ "messagePrioritizationService is not instance of " +
+ "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
+ )
+ }
+ }
+
+ override fun close() {
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..4ab399f54
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+
+open class KafkaMessagePrioritizationConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
+) {
+
+ private val log = logger(KafkaMessagePrioritizationConsumer::class)
+
+ private lateinit var streamingConsumerService: BlueprintMessageConsumerService
+
+ open fun consumerService(selector: String): BlueprintMessageConsumerService {
+ return bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(selector)
+ }
+
+ open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
+ KafkaStreamConsumerFunction {
+ return object : KafkaStreamConsumerFunction {
+
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+ override suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology {
+
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
+ log.info("Consuming prioritization topics($topics)")
+
+ topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
+
+ topology.addProcessor(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ bluePrintProcessorSupplier<ByteArray, ByteArray>(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
+ ),
+ MessagePrioritizationConstants.SOURCE_INPUT
+ )
+
+ /** To receive completed and error messages */
+ topology.addSink(
+ MessagePrioritizationConstants.SINK_OUTPUT,
+ kafkaConsumerConfiguration.outputTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
+ )
+
+ // Output will be sent to the group-output topic from Processor API
+ return topology
+ }
+ }
+ }
+
+ suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
+
+ val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+ ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+ streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
+
+ // Dynamic Consumer Function to create Topology
+ val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
+ streamingConsumerService.consume(null, consumerFunction)
+ }
+
+ suspend fun shutDown() {
+ if (::streamingConsumerService.isInitialized) {
+ streamingConsumerService.shutDown()
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
new file mode 100644
index 000000000..5595863d4
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.nio.charset.Charset
+
+open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+
+ override fun deserializer(): Deserializer<MessagePrioritization> {
+ return object : Deserializer<MessagePrioritization> {
+ override fun deserialize(topic: String, data: ByteArray): MessagePrioritization {
+ return JacksonUtils.readValue(String(data), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ }
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+ }
+ }
+
+ override fun serializer(): Serializer<MessagePrioritization> {
+ return object : Serializer<MessagePrioritization> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, data: MessagePrioritization): ByteArray {
+ return data.asJsonString().toByteArray(Charset.defaultCharset())
+ }
+
+ override fun close() {
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
new file mode 100644
index 000000000..502a7822d
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractNatsMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractNatsMessagePrioritizationService::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ updatedMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (!expiredIds.isNullOrEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ expiredMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..a0b2cf462
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.Subscription
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsMessagePrioritizationConsumer(
+ private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
+ private val natsMessagePrioritizationService: MessagePrioritizationService
+) {
+
+ private val log = logger(NatsMessagePrioritizationConsumer::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+ private lateinit var subscription: Subscription
+
+ suspend fun startConsuming() {
+ val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
+ val natsConfiguration = prioritizationConfiguration.natsConfiguration
+ ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
+
+ check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
+ "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
+ }
+ bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
+ natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
+ val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
+ val loadBalanceGroup = ClusterUtils.applicationName()
+ val messageHandler = createMessageHandler()
+ val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
+ subscription = bluePrintNatsService.loadBalanceSubscribe(
+ inputSubject,
+ loadBalanceGroup,
+ messageHandler,
+ subscriptionOptions
+ )
+ log.info(
+ "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
+ )
+ }
+
+ suspend fun shutDown() {
+ if (::subscription.isInitialized) {
+ subscription.unsubscribe()
+ }
+ log.info("Nats prioritization consumer listener shutdown complete")
+ }
+
+ private fun consumerService(selector: String): BluePrintNatsService {
+ return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
+ }
+
+ private fun createMessageHandler(): MessageHandler {
+ return MessageHandler { message ->
+ try {
+ val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
+ runBlocking {
+ natsMessagePrioritizationService.prioritize(messagePrioritization)
+ }
+ } catch (e: Exception) {
+ log.error("failed to process prioritize message", e)
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
new file mode 100644
index 000000000..f4602a810
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -0,0 +1,203 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
+
+ private val log = logger(AbstractMessagePrioritizationService::class)
+
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
+
+ override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+ this.prioritizationConfiguration = prioritizationConfiguration
+ }
+
+ override fun getConfiguration(): PrioritizationConfiguration {
+ return this.prioritizationConfiguration
+ }
+
+ override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
+ try {
+ log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ /** Get the cluster lock for message group */
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
+ // Save the Message
+ messagePrioritizationStateService.saveMessage(messagePrioritize)
+ handleCorrelationAndNextStep(messagePrioritize)
+ /** Cluster unLock for message group */
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
+ }
+ }
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ messages.forEach { message ->
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (!expiredIds.isNullOrEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+
+ override suspend fun cleanExpiredMessage() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
+ try {
+ messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
+ } catch (e: Exception) {
+ log.error("failed in clean expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+
+ open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
+ /** Check correlation enabled and correlation field has populated */
+ if (!messagePrioritization.correlationId.isNullOrBlank()) {
+ val id = messagePrioritization.id
+ val group = messagePrioritization.group
+ val correlationId = messagePrioritization.correlationId!!
+ val types = getGroupCorrelationTypes(messagePrioritization)
+ log.info(
+ "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
+ "correlation types($types), priority(${messagePrioritization.priority}), " +
+ "correlation id($correlationId)"
+ )
+
+ /** Get all previously received messages from database for group and optional types and correlation Id */
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+ .getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
+
+ /** If multiple records found, then check correlation */
+ if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
+ /** Check all correlation satisfies */
+ val correlationResults = MessageCorrelationUtils
+ .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+
+ if (correlationResults.correlated) {
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.PRIORITIZED.name
+ )
+ /** Correlation satisfied, Send only correlated messages to aggregate processor */
+ aggregate(waitingCorrelatedStoreMessages)
+ } else {
+ /** Correlation not satisfied */
+ log.trace("correlation not matched : ${correlationResults.message}")
+ // Update the Message state to Wait
+ messagePrioritizationStateService.setMessagesState(
+ waitingCorrelatedStoreMessages.ids(),
+ MessageState.WAIT.name
+ )
+ }
+ } else {
+ /** received first message of group and correlation Id, update the message with wait state */
+ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
+ }
+ } else {
+ /** No Correlation check needed, simply forward to next processor. */
+ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
+ aggregate(arrayListOf(messagePrioritization))
+ }
+ }
+
+ open suspend fun aggregate(messages: List<MessagePrioritization>) {
+ log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
+ if (!messages.isNullOrEmpty()) {
+ try {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(messages)
+ } catch (e: Exception) {
+ val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
+ if (!messages.isNullOrEmpty()) {
+ messages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
+ } catch (sendException: Exception) {
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}",
+ e
+ )
+ }
+ }
+ /** Publish to output topic */
+ output(messages)
+ }
+ }
+ }
+ }
+
+ /** Child will override this implementation , if necessary
+ * Here the place child has to implement custom Sequencing and Aggregation logic.
+ * */
+ abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
new file mode 100644
index 000000000..529d773a4
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+
+@Service
+open class MessagePrioritizationSchedulerService(
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
+
+ private val log = logger(MessagePrioritizationSchedulerService::class)
+
+ @Volatile
+ var keepGoing = true
+
+ /** This is sample scheduler implementation used during starting application with configuration.
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+ */
+
+ open suspend fun startScheduling() {
+ val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+
+ log.info("Starting Prioritization Scheduler Service...")
+ GlobalScope.launch {
+ expiryScheduler(prioritizationConfiguration)
+ }
+ GlobalScope.launch {
+ cleanUpScheduler(prioritizationConfiguration)
+ }
+ }
+
+ open suspend fun shutdownScheduling() {
+ keepGoing = false
+ val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+ delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
+ }
+
+ private suspend fun expiryScheduler(
+ prioritizationConfiguration: PrioritizationConfiguration
+ ) {
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
+ withContext(Dispatchers.Default) {
+ while (keepGoing) {
+ try {
+ messagePrioritizationService.updateExpiredMessages()
+ delay(expiryConfiguration.frequencyMilli)
+ } catch (e: Exception) {
+ log.error("failed in prioritization expiry scheduler", e)
+ }
+ }
+ }
+ }
+
+ private suspend fun cleanUpScheduler(
+ prioritizationConfiguration: PrioritizationConfiguration
+ ) {
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
+ withContext(Dispatchers.Default) {
+ while (keepGoing) {
+ try {
+ messagePrioritizationService.cleanExpiredMessage()
+ delay(cleanConfiguration.frequencyMilli)
+ } catch (e: Exception) {
+ log.error("failed in prioritization clean scheduler", e)
+ }
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
new file mode 100644
index 000000000..ed16fd44f
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
+import org.springframework.data.domain.PageRequest
+import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
+import java.util.Date
+
+@Service
+open class MessagePrioritizationStateServiceImpl(
+ private val prioritizationMessageRepository: PrioritizationMessageRepository
+) : MessagePrioritizationStateService {
+
+ private val log = logger(MessagePrioritizationStateServiceImpl::class)
+
+ @Transactional
+ override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
+ if (!message.correlationId.isNullOrBlank()) {
+ message.correlationId = message.toFormatedCorrelation()
+ }
+ message.updatedDate = Date()
+ return prioritizationMessageRepository.save(message)
+ }
+
+ override suspend fun getMessage(id: String): MessagePrioritization {
+ return prioritizationMessageRepository.findById(id).orElseGet(null)
+ ?: throw BluePrintProcessorException("couldn't find message for id($id)")
+ }
+
+ override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findAllById(ids)
+ }
+
+ override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
+ return prioritizationMessageRepository
+ .findByStateInAndExpiredDate(
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
+ Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByExpiredDate(
+ expiryDate, PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndExpiredDate(
+ group,
+ expiryDate, PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>? {
+ return if (!types.isNullOrEmpty()) {
+ prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
+ } else {
+ prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
+ }
+ }
+
+ @Transactional
+ override suspend fun updateMessagesState(ids: List<String>, state: String) {
+ ids.forEach {
+ val updated = updateMessageState(it, state)
+ log.info("message($it) update to state(${updated.state})")
+ }
+ }
+
+ @Transactional
+ override suspend fun setMessageState(id: String, state: String) {
+ prioritizationMessageRepository.setStateForMessageId(id, state, Date())
+ }
+
+ @Transactional
+ override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
+ prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
+ }
+
+ @Transactional
+ override suspend fun setMessagesState(ids: List<String>, state: String) {
+ prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
+ }
+
+ @Transactional
+ override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
+ prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
+ }
+
+ @Transactional
+ override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
+ val updateMessage = getMessage(id).apply {
+ this.updatedDate = Date()
+ this.state = state
+ }
+ return saveMessage(updateMessage)
+ }
+
+ @Transactional
+ override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
+ val groupedIds = aggregatedIds.joinToString(",")
+ prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
+ }
+
+ override suspend fun deleteMessage(id: String) {
+ prioritizationMessageRepository.deleteById(id)
+ log.info("Prioritization Messages $id deleted successfully.")
+ }
+
+ override suspend fun deleteMessages(ids: List<String>) {
+ prioritizationMessageRepository.deleteByIds(ids)
+ log.info("Prioritization Messages $ids deleted successfully.")
+ }
+
+ override suspend fun deleteExpiredMessage(retentionDays: Int) {
+ val expiryCheckDate = controllerDate().addDate(retentionDays)
+ prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate)
+ }
+
+ override suspend fun deleteMessageByGroup(group: String) {
+ prioritizationMessageRepository.deleteGroup(group)
+ log.info("Prioritization Messages group($group) deleted successfully.")
+ }
+
+ override suspend fun deleteMessageStates(group: String, states: List<String>) {
+ prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+ log.info("Prioritization Messages group($group) with states($states) deleted successfully.")
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
new file mode 100644
index 000000000..305e64ba4
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -0,0 +1,120 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** Sample Prioritization Service, Define spring service injector to register in application*/
+open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+class SampleMessagePrioritizationHandler(
+ private val messagePrioritizationService: MessagePrioritizationService,
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) {
+
+ private val log = logger(SampleMessagePrioritizationHandler::class)
+
+ suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ log.info("messages(${messages.ids()}) aggregated")
+ /** Sequence based on Priority and Updated Date */
+ val sequencedMessage = messages.orderByHighestPriority()
+ /** Update all messages to aggregated state */
+ messagePrioritizationStateService.setMessagesState(
+ sequencedMessage.ids(),
+ MessageState.AGGREGATED.name
+ )
+ messagePrioritizationService.output(sequencedMessage)
+ }
+
+ fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ /** Dummy Implementation, This can also be read from file and stored as cached map **/
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ "pass-typed" -> arrayListOf(messagePrioritization.type)
+ else -> null
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
new file mode 100644
index 000000000..7ab0be098
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+object MessageCorrelationUtils {
+
+ /** Assumption is message is of same group **/
+ fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse {
+ val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated")
+ if (collectedMessages.size > 1) {
+ val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() }
+ if (filteredMessage.isNotEmpty()) {
+ val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() }
+ if (groupedMessage.size == 1) {
+ correlationCheckResponse.correlated = true
+ correlationCheckResponse.message = null
+ }
+ }
+ } else {
+ correlationCheckResponse.message = "received only one message for that group"
+ }
+ return correlationCheckResponse
+ }
+
+ /** Assumption is message is of same group and checking for required types **/
+ fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?):
+ CorrelationCheckResponse {
+
+ return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
+
+ val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
+ if (!unknownMessageTypes.isNullOrEmpty()) {
+ throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
+ }
+
+ val copyTypes = types.toTypedArray().copyOf().toMutableList()
+
+ val filteredMessage = collectedMessages.filter {
+ !it.correlationId.isNullOrBlank() &&
+ types.contains(it.type)
+ }
+ var correlatedKeys: MutableSet<String> = mutableSetOf()
+ if (filteredMessage.isNotEmpty()) {
+ val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
+ val foundType = correlatedMap.keys.map { it.type }
+ copyTypes.removeAll(foundType)
+ correlatedKeys = correlatedMap.keys.map {
+ it.correlationId
+ }.toMutableSet()
+ }
+ /** Check if any Types missing and same correlation id for all types */
+ return if (copyTypes.isEmpty()) {
+ if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
+ else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
+ } else {
+ CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
+ }
+ } else {
+ return correlatedMessages(collectedMessages)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
new file mode 100644
index 000000000..2c4ae30da
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -0,0 +1,148 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
+import java.util.Date
+import java.util.UUID
+
+object MessagePrioritizationSample {
+
+ fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
+ return PrioritizationConfiguration().apply {
+ kafkaConfiguration = KafkaConfiguration().apply {
+ inputTopicSelector = "prioritize-input"
+ outputTopic = "prioritize-output-topic"
+ expiredTopic = "prioritize-expired-topic"
+ }
+ natsConfiguration = NatsConfiguration().apply {
+ connectionSelector = "cds-controller"
+ inputSubject = "prioritize-input"
+ outputSubject = "prioritize-output"
+ expiredSubject = "prioritize-expired"
+ }
+ expiryConfiguration = ExpiryConfiguration().apply {
+ frequencyMilli = 10000L
+ maxPollRecord = 2000
+ }
+ shutDownConfiguration = ShutDownConfiguration().apply {
+ waitMill = 2000L
+ }
+ cleanConfiguration = CleanConfiguration().apply {
+ frequencyMilli = 10000L
+ expiredRecordsHoldDays = 5
+ }
+ }
+ }
+
+ fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration {
+ return PrioritizationConfiguration().apply {
+ expiryConfiguration = ExpiryConfiguration().apply {
+ frequencyMilli = 10L
+ maxPollRecord = 2000
+ }
+ shutDownConfiguration = ShutDownConfiguration().apply {
+ waitMill = 20L
+ }
+ cleanConfiguration = CleanConfiguration().apply {
+ frequencyMilli = 10L
+ expiredRecordsHoldDays = 5
+ }
+ }
+ }
+
+ private fun currentDatePlusDays(days: Int): Date {
+ return controllerDate().addDate(days)
+ }
+
+ fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
+ return sampleMessages("sample-group", messageState, count)
+ }
+
+ fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(
+ groupName, messageState,
+ "sample-type", null
+ )
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun sampleMessageWithSameCorrelation(
+ groupName: String,
+ messageState: String,
+ count: Int
+ ): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(
+ groupName, messageState, "sample-type",
+ "key1=value1,key2=value2"
+ )
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun sampleMessageWithDifferentTypeSameCorrelation(
+ groupName: String,
+ messageState: String,
+ count: Int
+ ): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(
+ groupName, messageState, "type-$it",
+ "key1=value1,key2=value2"
+ )
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun createMessage(
+ groupName: String,
+ messageState: String,
+ messageType: String,
+ messageCorrelationId: String?
+ ): MessagePrioritization {
+
+ return MessagePrioritization().apply {
+ id = UUID.randomUUID().toString()
+ group = groupName
+ type = messageType
+ state = messageState
+ priority = (1..10).shuffled().first()
+ correlationId = messageCorrelationId
+ message = "I am the Message"
+ createdDate = Date()
+ updatedDate = Date()
+ expiryDate = currentDatePlusDays(3)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
new file mode 100644
index 000000000..86cec3697
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+object MessageProcessorUtils {
+
+ /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
+ suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+
+ return if (clusterService != null && clusterService.clusterJoined() &&
+ !messagePrioritization.correlationId.isNullOrBlank()
+ ) {
+ // Get the correlation key in ascending order, even it it is misplaced
+ val correlationId = messagePrioritization.toFormatedCorrelation()
+ val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility to create the cluster lock for expiry scheduler*/
+ suspend fun prioritizationExpiryLock(): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritize-expiry"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility to create the cluster lock for expiry scheduler*/
+ suspend fun prioritizationCleanLock(): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+ return if (clusterService != null && clusterService.clusterJoined()) {
+ val lockName = "prioritize-clean"
+ val clusterLock = clusterService.clusterLock(lockName)
+ clusterLock.lock()
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+ clusterLock
+ } else null
+ }
+
+ /** Utility used to cluster unlock for message [clusterLock] */
+ suspend fun prioritizationUnLock(clusterLock: ClusterLock?) {
+ if (clusterLock != null) {
+ clusterLock.unLock()
+ clusterLock.close()
+ }
+ }
+
+ /** Get the Kafka Supplier for processor lookup [name] **/
+ fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
+ return ProcessorSupplier<K, V> {
+ // Dynamically resolve the Prioritization Processor
+ BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
new file mode 100644
index 000000000..286a9b5c1
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -0,0 +1,350 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import io.mockk.coEvery
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.context.ApplicationContext
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@DataJpaTest
+@DirtiesContext
+@ContextConfiguration(
+ classes = [
+ BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
+ ]
+)
+@TestPropertySource(
+ properties =
+ [
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
+ ]
+)
+open class MessagePrioritizationConsumerTest {
+
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
+ @Autowired
+ lateinit var applicationContext: ApplicationContext
+
+ @Autowired
+ lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Autowired
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
+
+ @Autowired
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+
+ @Before
+ fun setup() {
+ BluePrintDependencyService.inject(applicationContext)
+ }
+
+ @Test
+ fun testBluePrintKafkaJDBCKeyStore() {
+ runBlocking {
+ assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
+
+ val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
+ .instance(MessagePrioritizationStateService::class)
+ assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
+
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
+ val message = messagePrioritizationService.saveMessage(it)
+ val repoResult = messagePrioritizationService.getMessage(message.id)
+ assertNotNull(repoResult, "failed to get inserted message.")
+ }
+ }
+ }
+
+ @Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
+ @Test
+ fun testStartConsuming() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+
+ val streamingConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
+ assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyStreamingConsumerService = spyk(streamingConsumerService)
+ coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
+ coEvery { spyStreamingConsumerService.shutDown() } returns Unit
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
+ )
+ val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
+
+ // Test Topology
+ val kafkaStreamConsumerFunction =
+ spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
+ assertNotNull(topology, "failed to get create topology")
+
+ every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
+ spyMessagePrioritizationConsumer.startConsuming(configuration)
+ spyMessagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ @Test
+ fun testSchedulerService() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
+ launch {
+ messagePrioritizationSchedulerService.startScheduling()
+ }
+ launch {
+ /** To debug increase the delay time */
+ delay(20)
+ messagePrioritizationSchedulerService.shutdownScheduling()
+ }
+ }
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ // @Test
+ fun testKafkaMessagePrioritizationConsumer() {
+ runBlocking {
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
+
+ /** Send sample message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
+ }
+ delay(10000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
new file mode 100644
index 000000000..22c399608
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
+import org.springframework.stereotype.Service
+import javax.sql.DataSource
+
+@Configuration
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
+@EnableAutoConfiguration
+open class TestDatabaseConfiguration {
+
+ @Bean("primaryDBLibGenericService")
+ open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
+ return PrimaryDBLibGenericService(
+ NamedParameterJdbcTemplate(dataSource)
+ )
+ }
+}
+
+/* Sample Prioritization Listener, used during Application startup
+@Component
+open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
+
+ private val log = logger(SamplePrioritizationListeners::class)
+
+ @EventListener(ApplicationReadyEvent::class)
+ open fun init() = runBlocking {
+ log.info("Starting PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer
+ .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+ }
+
+ @PreDestroy
+ open fun destroy() = runBlocking {
+ log.info("Shutting down PrioritizationListeners...")
+ defaultMessagePrioritizationConsumer.shutDown()
+ }
+}
+ */
+
+@Service
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
new file mode 100644
index 000000000..73d3738e5
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -0,0 +1,132 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
+import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
+
+class MessageCorrelationUtilsTest {
+
+ @Test
+ fun testCorrelationKeysReordered() {
+
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key2=value2,key1=value1"
+ )
+
+ val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
+ multipleMessages.add(message1)
+ multipleMessages.add(message2)
+ val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
+ assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
+ }
+
+ @Test
+ fun differentTypesWithSameCorrelationMessages() {
+ /** With Types **/
+ /* Assumption is Same group with different types */
+ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+ val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2")
+ )
+ assertTrue(
+ differentTypesWithSameCorrelationMessagesResponse.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResponse"
+ )
+
+ /* Assumption is Same group with different types and one missing expected types,
+ In this case type-3 message is missing */
+ val differentTypesWithSameCorrelationMessagesResWithMissingType =
+ MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3")
+ )
+ assertTrue(
+ !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
+ )
+ }
+
+ @Test
+ fun withSameCorrelationMessagesWithIgnoredTypes() {
+ /** With ignoring Types */
+ /** Assumption is only one message received */
+ val withSameCorrelationOneMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+ val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationOneMessages, null
+ )
+ assertTrue(
+ !withSameCorrelationOneMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
+
+ /** Assumption is two message received for same group with same correlation */
+ val withSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+ val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationMessages, null
+ )
+ assertTrue(
+ withSameCorrelationMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
+ }
+
+ @Test
+ fun differentTypesWithDifferentCorrelationMessage() {
+ /** Assumption is two message received for same group with different expected types and different correlation */
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-1", "key1=value1,key2=value3"
+ )
+ val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
+ differentTypesWithDifferentCorrelationMessage.add(message1)
+ differentTypesWithDifferentCorrelationMessage.add(message2)
+ val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithDifferentCorrelationMessage,
+ arrayListOf("type-0", "type-1")
+ )
+ assertTrue(
+ !differentTypesWithDifferentCorrelationMessageResp.correlated,
+ "failed to correlate differentTypesWithDifferentCorrelationMessageResp"
+ )
+ }
+
+ @Test
+ fun testPrioritizationOrdering() {
+ val differentPriorityMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5)
+ val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority()
+ assertNotNull(orderedPriorityMessages, "failed to order the priority messages")
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e3a1f7a01
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>${localPattern}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>