summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion
diff options
context:
space:
mode:
authorSingal, Kapil (ks220y) <ks220y@att.com>2020-12-15 19:02:17 -0500
committerKAPIL SINGAL <ks220y@att.com>2020-12-16 01:05:29 +0000
commit5844724ca96d08c3b752effdb10fd2586755912d (patch)
tree865f3f6f1736347c2305fdacf15f31e667e9283f /ms/blueprintsprocessor/functions/message-prioritizaion
parentf38e495d47e69b5203940e1f3eb76145c2a30e83 (diff)
Fixing typo in message-prioritization
Refactoring few POMs name tag Issue-ID: CCSDK-3053 Signed-off-by: Singal, Kapil (ks220y) <ks220y@att.com> Change-Id: I14447ea7f93efcc970213bbe7d42663cb87e33d7
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/README.md26
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml43
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt33
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt89
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt37
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt72
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt67
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt80
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt89
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt175
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt84
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt28
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt78
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt108
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt64
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt85
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt92
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt203
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt98
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt176
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt120
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt148
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt86
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt350
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt65
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt132
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml42
28 files changed, 0 insertions, 2752 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
deleted file mode 100644
index cda43faca..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
+++ /dev/null
@@ -1,26 +0,0 @@
-
-To Delete Topics
-------------------
-kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
-kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
-kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
-
-Create Topics
---------------
-
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
-
-To List topics
-----------------
-kafka-topics --list --bootstrap-server localhost:9092
-
-To publish message
---------------------
-kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic
-
-To Listen for Output
-----------------------
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
-
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
deleted file mode 100644
index e8467d02c..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright © 2018-2019 AT&T Intellectual Property.
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>blueprintsprocessor-functions</artifactId>
- <version>1.1.0-SNAPSHOT</version>
- </parent>
-
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
- <artifactId>message-prioritizaion</artifactId>
-
- <name>MS Blueprints Processor Functions - Message Prioritization</name>
- <description>Blueprints Processor Function - Message Prioritization</description>
-
- <dependencies>
- <dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor.modules</groupId>
- <artifactId>message-lib</artifactId>
- </dependency>
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
deleted file mode 100644
index 890e0a6ba..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.Configuration
-
-@Configuration
-@ComponentScan
-open class MessagePrioritizationConfiguration
-
-object MessagePrioritizationConstants {
-
- const val SOURCE_INPUT = "source-prioritization-input"
-
- const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
-
- const val SINK_OUTPUT = "sink-prioritization-output"
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
deleted file mode 100644
index 65b7644a8..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import java.io.Serializable
-
-object MessageActionConstants {
-
- const val PRIORITIZE = "prioritize"
-}
-
-enum class MessageState(val id: String) {
- NEW("new"),
- WAIT("wait"),
- EXPIRED("expired"),
- PRIORITIZED("prioritized"),
- AGGREGATED("aggregated"),
- COMPLETED("completed"),
- ERROR("error")
-}
-
-open class PrioritizationConfiguration : Serializable {
-
- lateinit var expiryConfiguration: ExpiryConfiguration
- lateinit var shutDownConfiguration: ShutDownConfiguration
- lateinit var cleanConfiguration: CleanConfiguration
- var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
- var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
-}
-
-open class KafkaConfiguration : Serializable {
-
- lateinit var inputTopicSelector: String // Consumer Configuration Selector
- lateinit var expiredTopic: String // Publish Configuration Selector
- lateinit var outputTopic: String // Publish Configuration Selector
-}
-
-open class NatsConfiguration : Serializable {
-
- lateinit var connectionSelector: String // Consumer Configuration Selector
- lateinit var inputSubject: String // Publish Configuration Selector
- lateinit var expiredSubject: String // Publish Configuration Selector
- lateinit var outputSubject: String // Publish Configuration Selector
-}
-
-open class ExpiryConfiguration : Serializable {
-
- var frequencyMilli: Long = 30000L
- var maxPollRecord: Int = 1000
-}
-
-open class ShutDownConfiguration : Serializable {
-
- var waitMill: Long = 30000L
-}
-
-open class CleanConfiguration : Serializable {
-
- var frequencyMilli: Long = 30000L
- var expiredRecordsHoldDays: Int = 5
-}
-
-open class UpdateStateRequest : Serializable {
-
- lateinit var id: String
- var group: String? = null
- var state: String? = null
-}
-
-data class CorrelationCheckResponse(
- var message: String? = null,
- var correlated: Boolean = false
-)
-
-data class TypeCorrelationKey(val type: String, val correlationId: String)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
deleted file mode 100644
index dfe516953..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-
-interface MessagePrioritizationService {
-
- fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
-
- fun getConfiguration(): PrioritizationConfiguration
-
- suspend fun prioritize(messagePrioritization: MessagePrioritization)
-
- /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
- suspend fun output(messages: List<MessagePrioritization>)
-
- /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
- suspend fun updateExpiredMessages()
-
- /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
- suspend fun cleanExpiredMessage()
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
deleted file mode 100644
index 2e5e6c617..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.Date
-
-interface MessagePrioritizationStateService {
-
- suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
-
- suspend fun getMessage(id: String): MessagePrioritization
-
- suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
-
- suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
-
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>?
-
- suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
-
- suspend fun getCorrelatedMessages(
- group: String,
- states: List<String>,
- types: List<String>?,
- correlationIds: String
- ): List<MessagePrioritization>?
-
- suspend fun updateMessagesState(ids: List<String>, state: String)
-
- suspend fun updateMessageState(id: String, state: String): MessagePrioritization
-
- suspend fun setMessageState(id: String, state: String)
-
- suspend fun setMessagesPriority(ids: List<String>, priority: String)
-
- suspend fun setMessagesState(ids: List<String>, state: String)
-
- suspend fun setMessageStateANdError(id: String, state: String, error: String)
-
- suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
-
- suspend fun deleteMessage(id: String)
-
- suspend fun deleteMessages(id: List<String>)
-
- suspend fun deleteExpiredMessage(retentionDays: Int)
-
- suspend fun deleteMessageByGroup(group: String)
-
- suspend fun deleteMessageStates(group: String, states: List<String>)
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
deleted file mode 100644
index d8e71d413..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-
-/**
- * Register the MessagePrioritizationStateService and exposed dependency
- */
-fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService =
- instance(MessagePrioritizationStateService::class)
-
-/**
- * Expose messagePrioritizationStateService to AbstractComponentFunction
- */
-fun AbstractComponentFunction.messagePrioritizationStateService() =
- BluePrintDependencyService.messagePrioritizationStateService()
-
-/**
- * MessagePrioritization correlation extensions
- */
-
-/**
- * Arrange comma separated correlation keys in ascending order.
- */
-fun MessagePrioritization.toFormatedCorrelation(): String {
- return this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
-}
-
-/**
- * Used to group the correlation with respect to types.
- */
-fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
- return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
-}
-
-/** get list of message ids **/
-fun List<MessagePrioritization>.ids(): List<String> {
- return this.map { it.id }
-}
-
-/** Ordered by highest priority and updated date **/
-fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> {
- return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate))
-}
-
-/** Ordered by Updated date **/
-fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> {
- return this.sortedWith(compareBy(MessagePrioritization::updatedDate))
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
deleted file mode 100644
index c7aab03b6..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope
-import org.springframework.http.MediaType
-import org.springframework.web.bind.annotation.GetMapping
-import org.springframework.web.bind.annotation.PathVariable
-import org.springframework.web.bind.annotation.PostMapping
-import org.springframework.web.bind.annotation.RequestBody
-import org.springframework.web.bind.annotation.RequestMapping
-import org.springframework.web.bind.annotation.ResponseBody
-import org.springframework.web.bind.annotation.RestController
-
-@RestController
-@RequestMapping(value = ["/api/v1/message-prioritization"])
-open class MessagePrioritizationApi(
- private val messagePrioritizationStateService: MessagePrioritizationStateService,
- private val messagePrioritizationService: MessagePrioritizationService
-) {
-
- @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
- @ResponseBody
- suspend fun ping(): String = mdcWebCoroutineScope { "Success" }
-
- @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
- @ResponseBody
- suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope {
- messagePrioritizationStateService.getMessage(id)
- }
-
- @PostMapping(
- path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
- consumes = [MediaType.APPLICATION_JSON_VALUE]
- )
- @ResponseBody
- suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) =
- mdcWebCoroutineScope {
- messagePrioritizationStateService.saveMessage(messagePrioritization)
- }
-
- @PostMapping(
- path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
- consumes = [MediaType.APPLICATION_JSON_VALUE]
- )
- @ResponseBody
- suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope {
- messagePrioritizationService.prioritize(messagePrioritization)
- }
-
- @PostMapping(
- path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
- consumes = [MediaType.APPLICATION_JSON_VALUE]
- )
- suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) =
- mdcWebCoroutineScope {
- messagePrioritizationStateService.setMessageState(
- updateMessageState.id,
- updateMessageState.state!!
- )
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
deleted file mode 100644
index ce2085f68..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
-
-import com.fasterxml.jackson.annotation.JsonFormat
-import org.hibernate.annotations.Proxy
-import org.springframework.data.annotation.LastModifiedDate
-import org.springframework.data.jpa.domain.support.AuditingEntityListener
-import org.springframework.data.jpa.repository.config.EnableJpaAuditing
-import java.util.Date
-import javax.persistence.Column
-import javax.persistence.Entity
-import javax.persistence.EntityListeners
-import javax.persistence.Id
-import javax.persistence.Lob
-import javax.persistence.Table
-import javax.persistence.Temporal
-import javax.persistence.TemporalType
-
-@EnableJpaAuditing
-@EntityListeners(AuditingEntityListener::class)
-@Entity
-@Table(name = "MESSAGE_PRIORITIZATION")
-@Proxy(lazy = false)
-open class MessagePrioritization {
-
- @Id
- @Column(name = "message_id", length = 50)
- lateinit var id: String
-
- @Column(name = "message_group", length = 50, nullable = false)
- lateinit var group: String
-
- @Column(name = "message_type", length = 50, nullable = false)
- lateinit var type: String
-
- /** States Defined by MessageState */
- @Column(name = "message_state", length = 20, nullable = false)
- lateinit var state: String
-
- @Column(name = "priority", nullable = false)
- var priority: Int = 5
-
- @Lob
- @Column(name = "message", nullable = false)
- var message: String? = null
-
- @Lob
- @Column(name = "error", nullable = true)
- var error: String? = null
-
- @Lob
- @Column(name = "aggregated_message_ids", nullable = true)
- var aggregatedMessageIds: String? = null
-
- @Lob
- @Column(name = "correlation_id", nullable = true)
- var correlationId: String? = null
-
- @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- @Temporal(TemporalType.TIMESTAMP)
- @Column(name = "created_date", nullable = false)
- var createdDate = Date()
-
- @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- @LastModifiedDate
- @Temporal(TemporalType.TIMESTAMP)
- @Column(name = "updated_date", nullable = false)
- var updatedDate: Date? = null
-
- @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
- @Temporal(TemporalType.TIMESTAMP)
- @Column(name = "expiry_date", nullable = false)
- var expiryDate: Date? = null
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
deleted file mode 100644
index 0b35e3856..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
-
-import org.springframework.data.domain.Pageable
-import org.springframework.data.jpa.repository.JpaRepository
-import org.springframework.data.jpa.repository.Modifying
-import org.springframework.data.jpa.repository.Query
-import org.springframework.stereotype.Repository
-import org.springframework.transaction.annotation.Transactional
-import java.util.Date
-
-@Repository
-@Transactional(readOnly = true)
-interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
- fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.updatedDate asc"
- )
- fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable):
- List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndStateInAndNotExpiredDate(
- group: String,
- states: List<String>,
- expiryCheckDate: Date,
- count: Pageable
- ): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
- )
- fun findByStateInAndExpiredDate(
- states: List<String>,
- expiryCheckDate: Date,
- count: Pageable
- ): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndStateInAndExpiredDate(
- group: String,
- states: List<String>,
- expiryCheckDate: Date,
- count: Pageable
- ): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
- )
- fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String):
- List<MessagePrioritization>?
-
- @Query(
- "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
- )
- fun findByGroupAndTypesAndCorrelationId(
- group: String,
- states: List<String>,
- types: List<String>,
- correlationId: String
- ): List<MessagePrioritization>?
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id = :id"
- )
- fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id = :id"
- )
- fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id IN :ids"
- )
- fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id IN :ids"
- )
- fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
- "WHERE id = :id"
- )
- fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query(
- "UPDATE MessagePrioritization SET state = :state, " +
- "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
- )
- fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization WHERE id IN :ids")
- fun deleteByIds(ids: List<String>)
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ")
- fun deleteByExpiryDate(expiryCheckDate: Date)
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization WHERE group = :group")
- fun deleteGroup(group: String)
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states")
- fun deleteGroupAndStateIn(group: String, states: List<String>)
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
deleted file mode 100644
index 112a80379..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-abstract class AbstractKafkaMessagePrioritizationService(
- private val messagePrioritizationStateService: MessagePrioritizationStateService
-) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
-
- private val log = logger(AbstractKafkaMessagePrioritizationService::class)
-
- lateinit var processorContext: ProcessorContext
-
- fun setKafkaProcessorContext(processorContext: ProcessorContext) {
- this.processorContext = processorContext
- }
-
- override suspend fun output(messages: List<MessagePrioritization>) {
- log.info("$$$$$ received in output processor id(${messages.ids()})")
- checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
- check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
-
- messages.forEach { message ->
- val updatedMessage =
- messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- processorContext.forward(
- updatedMessage.id,
- updatedMessage,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
-
- override suspend fun updateExpiredMessages() {
- checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
- check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
-
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
- try {
- val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
- val expiredIds = fetchMessages?.ids()
- if (expiredIds != null && expiredIds.isNotEmpty()) {
- messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- fetchMessages.forEach { expiredMessage ->
- expiredMessage.state = MessageState.EXPIRED.name
- processorContext.forward(
- expiredMessage.id, expiredMessage,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
- } catch (e: Exception) {
- log.error("failed in updating expired messages", e)
- } finally {
- MessageProcessorUtils.prioritizationUnLock(clusterLock)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
deleted file mode 100644
index d4f8470c8..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-
-/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
-abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
-
- override fun init(processorContext: ProcessorContext) {
- this.processorContext = processorContext
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
deleted file mode 100644
index 1b0612492..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-
-open class DefaultMessagePrioritizeProcessor(
- private val messagePrioritizationStateService: MessagePrioritizationStateService,
- private val kafkaMessagePrioritizationService: MessagePrioritizationService
-) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
-
- private val log = logger(DefaultMessagePrioritizeProcessor::class)
-
- override suspend fun processNB(key: ByteArray, value: ByteArray) {
-
- val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
- try {
- kafkaMessagePrioritizationService.prioritize(messagePrioritize)
- } catch (e: Exception) {
- messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
- log.error(messagePrioritize.error)
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritize.id, MessageState.ERROR.name,
- messagePrioritize.error!!
- )
- /** Publish to Output topic */
- this.processorContext.forward(
- messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
-
- override fun init(context: ProcessorContext) {
- super.init(context)
- /** Set Configuration and Processor Context to messagePrioritizationService */
- if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
- kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
- } else {
- throw BluePrintProcessorException(
- "messagePrioritizationService is not instance of " +
- "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
- )
- }
- }
-
- override fun close() {
- log.info(
- "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
deleted file mode 100644
index 4ab399f54..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.common.serialization.Serdes
-import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
-
-open class KafkaMessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
- private val kafkaMessagePrioritizationService: MessagePrioritizationService
-) {
-
- private val log = logger(KafkaMessagePrioritizationConsumer::class)
-
- private lateinit var streamingConsumerService: BlueprintMessageConsumerService
-
- open fun consumerService(selector: String): BlueprintMessageConsumerService {
- return bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(selector)
- }
-
- open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
- KafkaStreamConsumerFunction {
- return object : KafkaStreamConsumerFunction {
-
- val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
- ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
-
- override suspend fun createTopology(
- messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?
- ): Topology {
-
- val topology = Topology()
- val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
-
- val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
- log.info("Consuming prioritization topics($topics)")
-
- topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- bluePrintProcessorSupplier<ByteArray, ByteArray>(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- ),
- MessagePrioritizationConstants.SOURCE_INPUT
- )
-
- /** To receive completed and error messages */
- topology.addSink(
- MessagePrioritizationConstants.SINK_OUTPUT,
- kafkaConsumerConfiguration.outputTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- // Output will be sent to the group-output topic from Processor API
- return topology
- }
- }
- }
-
- suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
-
- val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
- ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
-
- streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
-
- // Dynamic Consumer Function to create Topology
- val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
- streamingConsumerService.consume(null, consumerFunction)
- }
-
- suspend fun shutDown() {
- if (::streamingConsumerService.isInitialized) {
- streamingConsumerService.shutDown()
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
deleted file mode 100644
index 5595863d4..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.common.serialization.Deserializer
-import org.apache.kafka.common.serialization.Serde
-import org.apache.kafka.common.serialization.Serializer
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.nio.charset.Charset
-
-open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
-
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- }
-
- override fun close() {
- }
-
- override fun deserializer(): Deserializer<MessagePrioritization> {
- return object : Deserializer<MessagePrioritization> {
- override fun deserialize(topic: String, data: ByteArray): MessagePrioritization {
- return JacksonUtils.readValue(String(data), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
- }
-
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- }
-
- override fun close() {
- }
- }
- }
-
- override fun serializer(): Serializer<MessagePrioritization> {
- return object : Serializer<MessagePrioritization> {
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- }
-
- override fun serialize(topic: String?, data: MessagePrioritization): ByteArray {
- return data.asJsonString().toByteArray(Charset.defaultCharset())
- }
-
- override fun close() {
- }
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
deleted file mode 100644
index 502a7822d..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-abstract class AbstractNatsMessagePrioritizationService(
- private val messagePrioritizationStateService: MessagePrioritizationStateService
-) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
-
- private val log = logger(AbstractNatsMessagePrioritizationService::class)
-
- lateinit var bluePrintNatsService: BluePrintNatsService
-
- override suspend fun output(messages: List<MessagePrioritization>) {
- log.info("$$$$$ received in output processor id(${messages.ids()})")
- checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
- check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
-
- val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
- messages.forEach { message ->
- val updatedMessage =
- messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
-
- /** send to the output subject */
- bluePrintNatsService.publish(
- NatsClusterUtils.currentApplicationSubject(outputSubject),
- updatedMessage.asJsonType().asByteArray()
- )
- }
- }
-
- override suspend fun updateExpiredMessages() {
- checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
- check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
-
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
- val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
- try {
- val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
- val expiredIds = fetchMessages?.ids()
- if (!expiredIds.isNullOrEmpty()) {
- messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- fetchMessages.forEach { expiredMessage ->
- expiredMessage.state = MessageState.EXPIRED.name
- /** send to the output subject */
- bluePrintNatsService.publish(
- NatsClusterUtils.currentApplicationSubject(outputSubject),
- expiredMessage.asJsonType().asByteArray()
- )
- }
- }
- } catch (e: Exception) {
- log.error("failed in updating expired messages", e)
- } finally {
- MessageProcessorUtils.prioritizationUnLock(clusterLock)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
deleted file mode 100644
index a0b2cf462..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
-
-import io.nats.streaming.MessageHandler
-import io.nats.streaming.Subscription
-import kotlinx.coroutines.runBlocking
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.asType
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
-
-open class NatsMessagePrioritizationConsumer(
- private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
- private val natsMessagePrioritizationService: MessagePrioritizationService
-) {
-
- private val log = logger(NatsMessagePrioritizationConsumer::class)
-
- lateinit var bluePrintNatsService: BluePrintNatsService
- private lateinit var subscription: Subscription
-
- suspend fun startConsuming() {
- val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
- val natsConfiguration = prioritizationConfiguration.natsConfiguration
- ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
-
- check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
- "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
- }
- bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
- natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
- val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
- val loadBalanceGroup = ClusterUtils.applicationName()
- val messageHandler = createMessageHandler()
- val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
- subscription = bluePrintNatsService.loadBalanceSubscribe(
- inputSubject,
- loadBalanceGroup,
- messageHandler,
- subscriptionOptions
- )
- log.info(
- "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
- )
- }
-
- suspend fun shutDown() {
- if (::subscription.isInitialized) {
- subscription.unsubscribe()
- }
- log.info("Nats prioritization consumer listener shutdown complete")
- }
-
- private fun consumerService(selector: String): BluePrintNatsService {
- return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
- }
-
- private fun createMessageHandler(): MessageHandler {
- return MessageHandler { message ->
- try {
- val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
- runBlocking {
- natsMessagePrioritizationService.prioritize(messagePrioritization)
- }
- } catch (e: Exception) {
- log.error("failed to process prioritize message", e)
- }
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
deleted file mode 100644
index f4602a810..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
-abstract class AbstractMessagePrioritizationService(
- private val messagePrioritizationStateService: MessagePrioritizationStateService
-) : MessagePrioritizationService {
-
- private val log = logger(AbstractMessagePrioritizationService::class)
-
- lateinit var prioritizationConfiguration: PrioritizationConfiguration
-
- override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
- this.prioritizationConfiguration = prioritizationConfiguration
- }
-
- override fun getConfiguration(): PrioritizationConfiguration {
- return this.prioritizationConfiguration
- }
-
- override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
- try {
- log.info("***** received in prioritize processor key(${messagePrioritize.id})")
- check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
-
- /** Get the cluster lock for message group */
- val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
- // Save the Message
- messagePrioritizationStateService.saveMessage(messagePrioritize)
- handleCorrelationAndNextStep(messagePrioritize)
- /** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationUnLock(clusterLock)
- } catch (e: Exception) {
- messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
- log.error(messagePrioritize.error)
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritize.id, MessageState.ERROR.name,
- messagePrioritize.error!!
- )
- }
- }
-
- override suspend fun output(messages: List<MessagePrioritization>) {
- log.info("$$$$$ received in output processor id(${messages.ids()})")
- messages.forEach { message ->
- messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- }
- }
-
- override suspend fun updateExpiredMessages() {
- check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
-
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
- try {
- val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
- val expiredIds = fetchMessages?.ids()
- if (!expiredIds.isNullOrEmpty()) {
- messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- }
- } catch (e: Exception) {
- log.error("failed in updating expired messages", e)
- } finally {
- MessageProcessorUtils.prioritizationUnLock(clusterLock)
- }
- }
-
- override suspend fun cleanExpiredMessage() {
- check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
-
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
- try {
- messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
- } catch (e: Exception) {
- log.error("failed in clean expired messages", e)
- } finally {
- MessageProcessorUtils.prioritizationUnLock(clusterLock)
- }
- }
-
- open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
- /** Check correlation enabled and correlation field has populated */
- if (!messagePrioritization.correlationId.isNullOrBlank()) {
- val id = messagePrioritization.id
- val group = messagePrioritization.group
- val correlationId = messagePrioritization.correlationId!!
- val types = getGroupCorrelationTypes(messagePrioritization)
- log.info(
- "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
- "correlation types($types), priority(${messagePrioritization.priority}), " +
- "correlation id($correlationId)"
- )
-
- /** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService
- .getCorrelatedMessages(
- group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
- )
-
- /** If multiple records found, then check correlation */
- if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
- /** Check all correlation satisfies */
- val correlationResults = MessageCorrelationUtils
- .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
-
- if (correlationResults.correlated) {
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(
- waitingCorrelatedStoreMessages.ids(),
- MessageState.PRIORITIZED.name
- )
- /** Correlation satisfied, Send only correlated messages to aggregate processor */
- aggregate(waitingCorrelatedStoreMessages)
- } else {
- /** Correlation not satisfied */
- log.trace("correlation not matched : ${correlationResults.message}")
- // Update the Message state to Wait
- messagePrioritizationStateService.setMessagesState(
- waitingCorrelatedStoreMessages.ids(),
- MessageState.WAIT.name
- )
- }
- } else {
- /** received first message of group and correlation Id, update the message with wait state */
- messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
- }
- } else {
- /** No Correlation check needed, simply forward to next processor. */
- messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- aggregate(arrayListOf(messagePrioritization))
- }
- }
-
- open suspend fun aggregate(messages: List<MessagePrioritization>) {
- log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
- if (!messages.isNullOrEmpty()) {
- try {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(messages)
- } catch (e: Exception) {
- val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
- if (!messages.isNullOrEmpty()) {
- messages.forEach { messagePrioritization ->
- try {
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritization.id,
- MessageState.ERROR.name, error
- )
- } catch (sendException: Exception) {
- log.error(
- "failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}",
- e
- )
- }
- }
- /** Publish to output topic */
- output(messages)
- }
- }
- }
- }
-
- /** Child will override this implementation , if necessary
- * Here the place child has to implement custom Sequencing and Aggregation logic.
- * */
- abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
-
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
deleted file mode 100644
index 529d773a4..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.springframework.stereotype.Service
-
-@Service
-open class MessagePrioritizationSchedulerService(
- private val messagePrioritizationService: MessagePrioritizationService
-) {
-
- private val log = logger(MessagePrioritizationSchedulerService::class)
-
- @Volatile
- var keepGoing = true
-
- /** This is sample scheduler implementation used during starting application with configuration.
- @EventListener(ApplicationReadyEvent::class)
- open fun init() = runBlocking {
- log.info("Starting PrioritizationListeners...")
- startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
- }
- */
-
- open suspend fun startScheduling() {
- val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
-
- log.info("Starting Prioritization Scheduler Service...")
- GlobalScope.launch {
- expiryScheduler(prioritizationConfiguration)
- }
- GlobalScope.launch {
- cleanUpScheduler(prioritizationConfiguration)
- }
- }
-
- open suspend fun shutdownScheduling() {
- keepGoing = false
- val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
- delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
- }
-
- private suspend fun expiryScheduler(
- prioritizationConfiguration: PrioritizationConfiguration
- ) {
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
- withContext(Dispatchers.Default) {
- while (keepGoing) {
- try {
- messagePrioritizationService.updateExpiredMessages()
- delay(expiryConfiguration.frequencyMilli)
- } catch (e: Exception) {
- log.error("failed in prioritization expiry scheduler", e)
- }
- }
- }
- }
-
- private suspend fun cleanUpScheduler(
- prioritizationConfiguration: PrioritizationConfiguration
- ) {
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
- withContext(Dispatchers.Default) {
- while (keepGoing) {
- try {
- messagePrioritizationService.cleanExpiredMessage()
- delay(cleanConfiguration.frequencyMilli)
- } catch (e: Exception) {
- log.error("failed in prioritization clean scheduler", e)
- }
- }
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
deleted file mode 100644
index ed16fd44f..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
-import org.springframework.data.domain.PageRequest
-import org.springframework.stereotype.Service
-import org.springframework.transaction.annotation.Transactional
-import java.util.Date
-
-@Service
-open class MessagePrioritizationStateServiceImpl(
- private val prioritizationMessageRepository: PrioritizationMessageRepository
-) : MessagePrioritizationStateService {
-
- private val log = logger(MessagePrioritizationStateServiceImpl::class)
-
- @Transactional
- override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
- if (!message.correlationId.isNullOrBlank()) {
- message.correlationId = message.toFormatedCorrelation()
- }
- message.updatedDate = Date()
- return prioritizationMessageRepository.save(message)
- }
-
- override suspend fun getMessage(id: String): MessagePrioritization {
- return prioritizationMessageRepository.findById(id).orElseGet(null)
- ?: throw BluePrintProcessorException("couldn't find message for id($id)")
- }
-
- override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
- return prioritizationMessageRepository.findAllById(ids)
- }
-
- override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
- return prioritizationMessageRepository
- .findByStateInAndExpiredDate(
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
- Date(), PageRequest.of(0, count)
- )
- }
-
- override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
- group,
- states, Date(), PageRequest.of(0, count)
- )
- }
-
- override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
- group,
- states, Date(), PageRequest.of(0, count)
- )
- }
-
- override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByExpiredDate(
- expiryDate, PageRequest.of(0, count)
- )
- }
-
- override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
- List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndExpiredDate(
- group,
- expiryDate, PageRequest.of(0, count)
- )
- }
-
- override suspend fun getCorrelatedMessages(
- group: String,
- states: List<String>,
- types: List<String>?,
- correlationIds: String
- ): List<MessagePrioritization>? {
- return if (!types.isNullOrEmpty()) {
- prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
- } else {
- prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
- }
- }
-
- @Transactional
- override suspend fun updateMessagesState(ids: List<String>, state: String) {
- ids.forEach {
- val updated = updateMessageState(it, state)
- log.info("message($it) update to state(${updated.state})")
- }
- }
-
- @Transactional
- override suspend fun setMessageState(id: String, state: String) {
- prioritizationMessageRepository.setStateForMessageId(id, state, Date())
- }
-
- @Transactional
- override suspend fun setMessagesPriority(ids: List<String>, priority: String) {
- prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date())
- }
-
- @Transactional
- override suspend fun setMessagesState(ids: List<String>, state: String) {
- prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
- }
-
- @Transactional
- override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
- prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
- }
-
- @Transactional
- override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
- val updateMessage = getMessage(id).apply {
- this.updatedDate = Date()
- this.state = state
- }
- return saveMessage(updateMessage)
- }
-
- @Transactional
- override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
- val groupedIds = aggregatedIds.joinToString(",")
- prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
- }
-
- override suspend fun deleteMessage(id: String) {
- prioritizationMessageRepository.deleteById(id)
- log.info("Prioritization Messages $id deleted successfully.")
- }
-
- override suspend fun deleteMessages(ids: List<String>) {
- prioritizationMessageRepository.deleteByIds(ids)
- log.info("Prioritization Messages $ids deleted successfully.")
- }
-
- override suspend fun deleteExpiredMessage(retentionDays: Int) {
- val expiryCheckDate = controllerDate().addDate(retentionDays)
- prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate)
- }
-
- override suspend fun deleteMessageByGroup(group: String) {
- prioritizationMessageRepository.deleteGroup(group)
- log.info("Prioritization Messages group($group) deleted successfully.")
- }
-
- override suspend fun deleteMessageStates(group: String, states: List<String>) {
- prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
- log.info("Prioritization Messages group($group) with states($states) deleted successfully.")
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
deleted file mode 100644
index 305e64ba4..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-/** Sample Prioritization Service, Define spring service injector to register in application*/
-open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
- AbstractMessagePrioritizationService(messagePrioritizationStateService) {
-
- /** Child overriding this implementation , if necessary */
- override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- sampleMessagePrioritizationHandler.handleAggregation(messages)
- }
-
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
- }
-}
-
-open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
- AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
-
- /** Child overriding this implementation , if necessary */
- override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- sampleMessagePrioritizationHandler.handleAggregation(messages)
- }
-
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
- }
-}
-
-open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
- AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
-
- /** Child overriding this implementation , if necessary */
- override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- sampleMessagePrioritizationHandler.handleAggregation(messages)
- }
-
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
- this, messagePrioritizationStateService
- )
- return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
- }
-}
-
-class SampleMessagePrioritizationHandler(
- private val messagePrioritizationService: MessagePrioritizationService,
- private val messagePrioritizationStateService: MessagePrioritizationStateService
-) {
-
- private val log = logger(SampleMessagePrioritizationHandler::class)
-
- suspend fun handleAggregation(messages: List<MessagePrioritization>) {
- log.info("messages(${messages.ids()}) aggregated")
- /** Sequence based on Priority and Updated Date */
- val sequencedMessage = messages.orderByHighestPriority()
- /** Update all messages to aggregated state */
- messagePrioritizationStateService.setMessagesState(
- sequencedMessage.ids(),
- MessageState.AGGREGATED.name
- )
- messagePrioritizationService.output(sequencedMessage)
- }
-
- fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return when (messagePrioritization.group) {
- /** Dummy Implementation, This can also be read from file and stored as cached map **/
- "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
- "pass-typed" -> arrayListOf(messagePrioritization.type)
- else -> null
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
deleted file mode 100644
index 7ab0be098..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-
-object MessageCorrelationUtils {
-
- /** Assumption is message is of same group **/
- fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse {
- val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated")
- if (collectedMessages.size > 1) {
- val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() }
- if (filteredMessage.isNotEmpty()) {
- val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() }
- if (groupedMessage.size == 1) {
- correlationCheckResponse.correlated = true
- correlationCheckResponse.message = null
- }
- }
- } else {
- correlationCheckResponse.message = "received only one message for that group"
- }
- return correlationCheckResponse
- }
-
- /** Assumption is message is of same group and checking for required types **/
- fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?):
- CorrelationCheckResponse {
-
- return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
-
- val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
- if (!unknownMessageTypes.isNullOrEmpty()) {
- throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
- }
-
- val copyTypes = types.toTypedArray().copyOf().toMutableList()
-
- val filteredMessage = collectedMessages.filter {
- !it.correlationId.isNullOrBlank() &&
- types.contains(it.type)
- }
- var correlatedKeys: MutableSet<String> = mutableSetOf()
- if (filteredMessage.isNotEmpty()) {
- val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
- val foundType = correlatedMap.keys.map { it.type }
- copyTypes.removeAll(foundType)
- correlatedKeys = correlatedMap.keys.map {
- it.correlationId
- }.toMutableSet()
- }
- /** Check if any Types missing and same correlation id for all types */
- return if (copyTypes.isEmpty()) {
- if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
- else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
- } else {
- CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
- }
- } else {
- return correlatedMessages(collectedMessages)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
deleted file mode 100644
index 2c4ae30da..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
-
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
-import java.util.Date
-import java.util.UUID
-
-object MessagePrioritizationSample {
-
- fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
- return PrioritizationConfiguration().apply {
- kafkaConfiguration = KafkaConfiguration().apply {
- inputTopicSelector = "prioritize-input"
- outputTopic = "prioritize-output-topic"
- expiredTopic = "prioritize-expired-topic"
- }
- natsConfiguration = NatsConfiguration().apply {
- connectionSelector = "cds-controller"
- inputSubject = "prioritize-input"
- outputSubject = "prioritize-output"
- expiredSubject = "prioritize-expired"
- }
- expiryConfiguration = ExpiryConfiguration().apply {
- frequencyMilli = 10000L
- maxPollRecord = 2000
- }
- shutDownConfiguration = ShutDownConfiguration().apply {
- waitMill = 2000L
- }
- cleanConfiguration = CleanConfiguration().apply {
- frequencyMilli = 10000L
- expiredRecordsHoldDays = 5
- }
- }
- }
-
- fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration {
- return PrioritizationConfiguration().apply {
- expiryConfiguration = ExpiryConfiguration().apply {
- frequencyMilli = 10L
- maxPollRecord = 2000
- }
- shutDownConfiguration = ShutDownConfiguration().apply {
- waitMill = 20L
- }
- cleanConfiguration = CleanConfiguration().apply {
- frequencyMilli = 10L
- expiredRecordsHoldDays = 5
- }
- }
- }
-
- private fun currentDatePlusDays(days: Int): Date {
- return controllerDate().addDate(days)
- }
-
- fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
- return sampleMessages("sample-group", messageState, count)
- }
-
- fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
- val messages: MutableList<MessagePrioritization> = arrayListOf()
- repeat(count) {
- val backPressureMessage = createMessage(
- groupName, messageState,
- "sample-type", null
- )
- messages.add(backPressureMessage)
- }
- return messages
- }
-
- fun sampleMessageWithSameCorrelation(
- groupName: String,
- messageState: String,
- count: Int
- ): List<MessagePrioritization> {
- val messages: MutableList<MessagePrioritization> = arrayListOf()
- repeat(count) {
- val backPressureMessage = createMessage(
- groupName, messageState, "sample-type",
- "key1=value1,key2=value2"
- )
- messages.add(backPressureMessage)
- }
- return messages
- }
-
- fun sampleMessageWithDifferentTypeSameCorrelation(
- groupName: String,
- messageState: String,
- count: Int
- ): List<MessagePrioritization> {
- val messages: MutableList<MessagePrioritization> = arrayListOf()
- repeat(count) {
- val backPressureMessage = createMessage(
- groupName, messageState, "type-$it",
- "key1=value1,key2=value2"
- )
- messages.add(backPressureMessage)
- }
- return messages
- }
-
- fun createMessage(
- groupName: String,
- messageState: String,
- messageType: String,
- messageCorrelationId: String?
- ): MessagePrioritization {
-
- return MessagePrioritization().apply {
- id = UUID.randomUUID().toString()
- group = groupName
- type = messageType
- state = messageState
- priority = (1..10).shuffled().first()
- correlationId = messageCorrelationId
- message = "I am the Message"
- createdDate = Date()
- updatedDate = Date()
- expiryDate = currentDatePlusDays(3)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
deleted file mode 100644
index 86cec3697..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
-
-import org.apache.kafka.streams.processor.ProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-
-object MessageProcessorUtils {
-
- /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
- suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
- val clusterService = BluePrintDependencyService.optionalClusterService()
-
- return if (clusterService != null && clusterService.clusterJoined() &&
- !messagePrioritization.correlationId.isNullOrBlank()
- ) {
- // Get the correlation key in ascending order, even it it is misplaced
- val correlationId = messagePrioritization.toFormatedCorrelation()
- val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
- val clusterLock = clusterService.clusterLock(lockName)
- clusterLock.lock()
- if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
- clusterLock
- } else null
- }
-
- /** Utility to create the cluster lock for expiry scheduler*/
- suspend fun prioritizationExpiryLock(): ClusterLock? {
- val clusterService = BluePrintDependencyService.optionalClusterService()
- return if (clusterService != null && clusterService.clusterJoined()) {
- val lockName = "prioritize-expiry"
- val clusterLock = clusterService.clusterLock(lockName)
- clusterLock.lock()
- if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
- clusterLock
- } else null
- }
-
- /** Utility to create the cluster lock for expiry scheduler*/
- suspend fun prioritizationCleanLock(): ClusterLock? {
- val clusterService = BluePrintDependencyService.optionalClusterService()
- return if (clusterService != null && clusterService.clusterJoined()) {
- val lockName = "prioritize-clean"
- val clusterLock = clusterService.clusterLock(lockName)
- clusterLock.lock()
- if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
- clusterLock
- } else null
- }
-
- /** Utility used to cluster unlock for message [clusterLock] */
- suspend fun prioritizationUnLock(clusterLock: ClusterLock?) {
- if (clusterLock != null) {
- clusterLock.unLock()
- clusterLock.close()
- }
- }
-
- /** Get the Kafka Supplier for processor lookup [name] **/
- fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
- return ProcessorSupplier<K, V> {
- // Dynamically resolve the Prioritization Processor
- BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
deleted file mode 100644
index 286a9b5c1..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import io.mockk.coEvery
-import io.mockk.every
-import io.mockk.mockk
-import io.mockk.spyk
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
-import org.junit.Before
-import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
-import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
-import org.springframework.context.ApplicationContext
-import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.ContextConfiguration
-import org.springframework.test.context.TestPropertySource
-import org.springframework.test.context.junit4.SpringRunner
-import kotlin.test.Test
-import kotlin.test.assertNotNull
-
-@RunWith(SpringRunner::class)
-@DataJpaTest
-@DirtiesContext
-@ContextConfiguration(
- classes = [
- BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
- MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class
- ]
-)
-@TestPropertySource(
- properties =
- [
- "spring.jpa.show-sql=false",
- "spring.jpa.properties.hibernate.show_sql=false",
- "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
-
- "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
- "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
- "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
- "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
- "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
- "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
- "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
-
- // To send initial test message
- "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
- "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
- "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
- "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
- "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
- "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
- "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
- "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
-
- "blueprintsprocessor.nats.cds-controller.type=token-auth",
- "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
- "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
- ]
-)
-open class MessagePrioritizationConsumerTest {
-
- private val log = logger(MessagePrioritizationConsumerTest::class)
-
- @Autowired
- lateinit var applicationContext: ApplicationContext
-
- @Autowired
- lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
-
- @Autowired
- lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
-
- @Autowired
- lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
-
- @Autowired
- lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
-
- @Before
- fun setup() {
- BluePrintDependencyService.inject(applicationContext)
- }
-
- @Test
- fun testBluePrintKafkaJDBCKeyStore() {
- runBlocking {
- assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
-
- val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
- .instance(MessagePrioritizationStateService::class)
- assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
-
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
- val message = messagePrioritizationService.saveMessage(it)
- val repoResult = messagePrioritizationService.getMessage(message.id)
- assertNotNull(repoResult, "failed to get inserted message.")
- }
- }
- }
-
- @Test
- fun testMessagePrioritizationService() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
- val messagePrioritizationService =
- SampleMessagePrioritizationService(messagePrioritizationStateService)
- messagePrioritizationService.setConfiguration(configuration)
-
- log.info("**************** without Correlation **************")
- /** Checking without correlation */
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
- messagePrioritizationService.prioritize(it)
- }
- log.info("**************** Same Group , with Correlation **************")
- /** checking same group with correlation */
- MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
- .forEach {
- delay(10)
- messagePrioritizationService.prioritize(it)
- }
- log.info("**************** Different Type , with Correlation **************")
- /** checking different type, with correlation */
- MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
- .forEach {
- delay(10)
- messagePrioritizationService.prioritize(it)
- }
- }
- }
-
- @Test
- fun testStartConsuming() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
-
- val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
- assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
-
- val spyStreamingConsumerService = spyk(streamingConsumerService)
- coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
- coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService, mockk()
- )
- val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
-
- // Test Topology
- val kafkaStreamConsumerFunction =
- spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
- val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
- val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
- assertNotNull(topology, "failed to get create topology")
-
- every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
- spyMessagePrioritizationConsumer.startConsuming(configuration)
- spyMessagePrioritizationConsumer.shutDown()
- }
- }
-
- @Test
- fun testSchedulerService() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
- val messagePrioritizationService =
- SampleMessagePrioritizationService(messagePrioritizationStateService)
- messagePrioritizationService.setConfiguration(configuration)
-
- val messagePrioritizationSchedulerService =
- MessagePrioritizationSchedulerService(messagePrioritizationService)
- launch {
- messagePrioritizationSchedulerService.startScheduling()
- }
- launch {
- /** To debug increase the delay time */
- delay(20)
- messagePrioritizationSchedulerService.shutdownScheduling()
- }
- }
- }
-
- /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- // @Test
- fun testKafkaMessagePrioritizationConsumer() {
- runBlocking {
-
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
- val kafkaMessagePrioritizationService =
- SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
- kafkaMessagePrioritizationService.setConfiguration(configuration)
-
- val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
- messagePrioritizationStateService,
- kafkaMessagePrioritizationService
- )
-
- // Register the processor
- BluePrintDependencyService.registerSingleton(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- defaultMessagePrioritizeProcessor
- )
-
- val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService,
- kafkaMessagePrioritizationService
- )
- messagePrioritizationConsumer.startConsuming(configuration)
-
- /** Send sample message with every 1 sec */
- val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
- launch {
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
- delay(100)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(
- key = "mykey",
- message = it.asJsonString(false),
- headers = headers
- )
- }
-
- MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
- .forEach {
- delay(100)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(
- key = "mykey",
- message = it.asJsonString(false),
- headers = headers
- )
- }
-
- MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
- .forEach {
- delay(2000)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(
- key = "mykey",
- message = it.asJsonString(false),
- headers = headers
- )
- }
- }
- delay(10000)
- messagePrioritizationConsumer.shutDown()
- }
- }
-
- /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
- * Start :
- * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
- * */
- // @Test
- fun testNatsMessagePrioritizationConsumer() {
- runBlocking {
- val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
- assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
-
- val inputSubject =
- NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
-
- val natsMessagePrioritizationService =
- SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
- natsMessagePrioritizationService.setConfiguration(configuration)
-
- val messagePrioritizationConsumer =
- NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
- messagePrioritizationConsumer.startConsuming()
-
- /** Send sample message with every 1 sec */
- val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
-
- launch {
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
- delay(100)
- bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
- }
-
- MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
- .forEach {
- delay(100)
- bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
- }
-
- MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
- .forEach {
- delay(200)
- bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
- }
- }
- delay(3000)
- messagePrioritizationConsumer.shutDown()
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
deleted file mode 100644
index 22c399608..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-
-import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.Configuration
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
-import org.springframework.stereotype.Service
-import javax.sql.DataSource
-
-@Configuration
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
-@EnableAutoConfiguration
-open class TestDatabaseConfiguration {
-
- @Bean("primaryDBLibGenericService")
- open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
- return PrimaryDBLibGenericService(
- NamedParameterJdbcTemplate(dataSource)
- )
- }
-}
-
-/* Sample Prioritization Listener, used during Application startup
-@Component
-open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) {
-
- private val log = logger(SamplePrioritizationListeners::class)
-
- @EventListener(ApplicationReadyEvent::class)
- open fun init() = runBlocking {
- log.info("Starting PrioritizationListeners...")
- defaultMessagePrioritizationConsumer
- .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
- }
-
- @PreDestroy
- open fun destroy() = runBlocking {
- log.info("Shutting down PrioritizationListeners...")
- defaultMessagePrioritizationConsumer.shutDown()
- }
-}
- */
-
-@Service
-open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
- SampleMessagePrioritizationService(messagePrioritizationStateService)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
deleted file mode 100644
index 73d3738e5..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
-
-import org.junit.Test
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
-import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
-
-class MessageCorrelationUtilsTest {
-
- @Test
- fun testCorrelationKeysReordered() {
-
- val message1 = MessagePrioritizationSample.createMessage(
- "sample-group", MessageState.NEW.name,
- "type-0", "key1=value1,key2=value2"
- )
- val message2 = MessagePrioritizationSample.createMessage(
- "sample-group", MessageState.NEW.name,
- "type-0", "key2=value2,key1=value1"
- )
-
- val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
- multipleMessages.add(message1)
- multipleMessages.add(message2)
- val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
- assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
- }
-
- @Test
- fun differentTypesWithSameCorrelationMessages() {
- /** With Types **/
- /* Assumption is Same group with different types */
- val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
- val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithSameCorrelationMessages,
- arrayListOf("type-0", "type-1", "type-2")
- )
- assertTrue(
- differentTypesWithSameCorrelationMessagesResponse.correlated,
- "failed to correlate differentTypesWithSameCorrelationMessagesResponse"
- )
-
- /* Assumption is Same group with different types and one missing expected types,
- In this case type-3 message is missing */
- val differentTypesWithSameCorrelationMessagesResWithMissingType =
- MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithSameCorrelationMessages,
- arrayListOf("type-0", "type-1", "type-2", "type-3")
- )
- assertTrue(
- !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
- "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
- )
- }
-
- @Test
- fun withSameCorrelationMessagesWithIgnoredTypes() {
- /** With ignoring Types */
- /** Assumption is only one message received */
- val withSameCorrelationOneMessages = MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
- val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- withSameCorrelationOneMessages, null
- )
- assertTrue(
- !withSameCorrelationOneMessagesResp.correlated,
- "failed to correlate withSameCorrelationMessagesResp"
- )
-
- /** Assumption is two message received for same group with same correlation */
- val withSameCorrelationMessages = MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
- val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- withSameCorrelationMessages, null
- )
- assertTrue(
- withSameCorrelationMessagesResp.correlated,
- "failed to correlate withSameCorrelationMessagesResp"
- )
- }
-
- @Test
- fun differentTypesWithDifferentCorrelationMessage() {
- /** Assumption is two message received for same group with different expected types and different correlation */
- val message1 = MessagePrioritizationSample.createMessage(
- "sample-group", MessageState.NEW.name,
- "type-0", "key1=value1,key2=value2"
- )
- val message2 = MessagePrioritizationSample.createMessage(
- "sample-group", MessageState.NEW.name,
- "type-1", "key1=value1,key2=value3"
- )
- val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
- differentTypesWithDifferentCorrelationMessage.add(message1)
- differentTypesWithDifferentCorrelationMessage.add(message2)
- val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithDifferentCorrelationMessage,
- arrayListOf("type-0", "type-1")
- )
- assertTrue(
- !differentTypesWithDifferentCorrelationMessageResp.correlated,
- "failed to correlate differentTypesWithDifferentCorrelationMessageResp"
- )
- }
-
- @Test
- fun testPrioritizationOrdering() {
- val differentPriorityMessages = MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5)
- val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority()
- assertNotNull(orderedPriorityMessages, "failed to order the priority messages")
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
deleted file mode 100644
index e3a1f7a01..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
- ~ Copyright © 2018-2019 AT&T Intellectual Property.
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
-
- <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
- <property name="defaultPattern"
- value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
- <property name="testing"
- value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <!-- encoders are assigned the type
- ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
- <encoder>
- <pattern>${localPattern}</pattern>
- </encoder>
- </appender>
-
- <logger name="org.springframework.test" level="warn"/>
- <logger name="org.springframework" level="warn"/>
- <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
- <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
-
- <root level="warn">
- <appender-ref ref="STDOUT"/>
- </root>
-
-</configuration>