From 5844724ca96d08c3b752effdb10fd2586755912d Mon Sep 17 00:00:00 2001 From: "Singal, Kapil (ks220y)" Date: Tue, 15 Dec 2020 19:02:17 -0500 Subject: Fixing typo in message-prioritization Refactoring few POMs name tag Issue-ID: CCSDK-3053 Signed-off-by: Singal, Kapil (ks220y) Change-Id: I14447ea7f93efcc970213bbe7d42663cb87e33d7 --- .../functions/message-prioritizaion/README.md | 26 -- .../functions/message-prioritizaion/pom.xml | 43 --- .../MessagePrioritizationConfiguration.kt | 33 -- .../prioritization/MessagePrioritizationData.kt | 89 ------ .../prioritization/MessagePrioritizationService.kt | 37 --- .../MessagePrioritizationStateService.kt | 72 ----- .../prioritization/MessagePrioritizeExtensions.kt | 67 ---- .../prioritization/api/MessagePrioritizationApi.kt | 80 ----- .../prioritization/db/MessagePrioritization.kt | 89 ------ .../db/PrioritizationMessageRepository.kt | 175 ----------- .../AbstractKafkaMessagePrioritizationService.kt | 84 ----- .../kafka/AbstractMessagePrioritizeProcessor.kt | 28 -- .../kafka/DefaultMessagePrioritizeProcessor.kt | 78 ----- .../kafka/KafkaMessagePrioritizationConsumer.kt | 108 ------- .../kafka/MessagePrioritizationSerde.kt | 64 ---- .../AbstractNatsMessagePrioritizationService.kt | 85 ----- .../nats/NatsMessagePrioritizationConsumer.kt | 92 ------ .../AbstractMessagePrioritizationService.kt | 203 ------------ .../MessagePrioritizationSchedulerService.kt | 98 ------ .../MessagePrioritizationStateServiceImpl.kt | 176 ----------- .../service/SampleMessagePrioritizationService.kt | 120 ------- .../utils/MessageCorrelationUtils.kt | 82 ----- .../utils/MessagePrioritizationSample.kt | 148 --------- .../prioritization/utils/MessageProcessorUtils.kt | 86 ----- .../MessagePrioritizationConsumerTest.kt | 350 --------------------- .../message/prioritization/TestConfiguration.kt | 65 ---- .../utils/MessageCorrelationUtilsTest.kt | 132 -------- .../src/test/resources/logback-test.xml | 42 --- .../functions/message-prioritization/README.md | 26 ++ .../functions/message-prioritization/pom.xml | 43 +++ .../MessagePrioritizationConfiguration.kt | 33 ++ .../prioritization/MessagePrioritizationData.kt | 89 ++++++ .../prioritization/MessagePrioritizationService.kt | 37 +++ .../MessagePrioritizationStateService.kt | 72 +++++ .../prioritization/MessagePrioritizeExtensions.kt | 67 ++++ .../prioritization/api/MessagePrioritizationApi.kt | 80 +++++ .../prioritization/db/MessagePrioritization.kt | 89 ++++++ .../db/PrioritizationMessageRepository.kt | 175 +++++++++++ .../AbstractKafkaMessagePrioritizationService.kt | 84 +++++ .../kafka/AbstractMessagePrioritizeProcessor.kt | 28 ++ .../kafka/DefaultMessagePrioritizeProcessor.kt | 78 +++++ .../kafka/KafkaMessagePrioritizationConsumer.kt | 108 +++++++ .../kafka/MessagePrioritizationSerde.kt | 64 ++++ .../AbstractNatsMessagePrioritizationService.kt | 85 +++++ .../nats/NatsMessagePrioritizationConsumer.kt | 92 ++++++ .../AbstractMessagePrioritizationService.kt | 203 ++++++++++++ .../MessagePrioritizationSchedulerService.kt | 98 ++++++ .../MessagePrioritizationStateServiceImpl.kt | 176 +++++++++++ .../service/SampleMessagePrioritizationService.kt | 120 +++++++ .../utils/MessageCorrelationUtils.kt | 82 +++++ .../utils/MessagePrioritizationSample.kt | 148 +++++++++ .../prioritization/utils/MessageProcessorUtils.kt | 86 +++++ .../MessagePrioritizationConsumerTest.kt | 350 +++++++++++++++++++++ .../message/prioritization/TestConfiguration.kt | 65 ++++ .../utils/MessageCorrelationUtilsTest.kt | 132 ++++++++ .../src/test/resources/logback-test.xml | 42 +++ ms/blueprintsprocessor/functions/pom.xml | 2 +- .../modules/inbounds/configs-api/pom.xml | 2 +- .../modules/inbounds/designer-api/pom.xml | 2 +- .../modules/inbounds/health-api-common/pom.xml | 2 +- .../modules/inbounds/health-api/pom.xml | 2 +- .../modules/inbounds/resource-api/pom.xml | 2 +- .../modules/inbounds/selfservice-api/pom.xml | 2 +- ms/blueprintsprocessor/parent/pom.xml | 2 +- 64 files changed, 2760 insertions(+), 2760 deletions(-) delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.md delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/README.md create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/pom.xml create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml (limited to 'ms/blueprintsprocessor') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md deleted file mode 100644 index cda43faca..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md +++ /dev/null @@ -1,26 +0,0 @@ - -To Delete Topics ------------------- -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog - -Create Topics --------------- - -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic - -To List topics ----------------- -kafka-topics --list --bootstrap-server localhost:9092 - -To publish message --------------------- -kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic - -To Listen for Output ----------------------- -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml deleted file mode 100644 index e8467d02c..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - - 4.0.0 - - - org.onap.ccsdk.cds.blueprintsprocessor - blueprintsprocessor-functions - 1.1.0-SNAPSHOT - - - org.onap.ccsdk.cds.blueprintsprocessor.functions - message-prioritizaion - - MS Blueprints Processor Functions - Message Prioritization - Blueprints Processor Function - Message Prioritization - - - - org.onap.ccsdk.cds.blueprintsprocessor.modules - message-lib - - - com.h2database - h2 - - - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt deleted file mode 100644 index 890e0a6ba..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration - -@Configuration -@ComponentScan -open class MessagePrioritizationConfiguration - -object MessagePrioritizationConstants { - - const val SOURCE_INPUT = "source-prioritization-input" - - const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" - - const val SINK_OUTPUT = "sink-prioritization-output" -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt deleted file mode 100644 index 65b7644a8..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import java.io.Serializable - -object MessageActionConstants { - - const val PRIORITIZE = "prioritize" -} - -enum class MessageState(val id: String) { - NEW("new"), - WAIT("wait"), - EXPIRED("expired"), - PRIORITIZED("prioritized"), - AGGREGATED("aggregated"), - COMPLETED("completed"), - ERROR("error") -} - -open class PrioritizationConfiguration : Serializable { - - lateinit var expiryConfiguration: ExpiryConfiguration - lateinit var shutDownConfiguration: ShutDownConfiguration - lateinit var cleanConfiguration: CleanConfiguration - var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration - var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration -} - -open class KafkaConfiguration : Serializable { - - lateinit var inputTopicSelector: String // Consumer Configuration Selector - lateinit var expiredTopic: String // Publish Configuration Selector - lateinit var outputTopic: String // Publish Configuration Selector -} - -open class NatsConfiguration : Serializable { - - lateinit var connectionSelector: String // Consumer Configuration Selector - lateinit var inputSubject: String // Publish Configuration Selector - lateinit var expiredSubject: String // Publish Configuration Selector - lateinit var outputSubject: String // Publish Configuration Selector -} - -open class ExpiryConfiguration : Serializable { - - var frequencyMilli: Long = 30000L - var maxPollRecord: Int = 1000 -} - -open class ShutDownConfiguration : Serializable { - - var waitMill: Long = 30000L -} - -open class CleanConfiguration : Serializable { - - var frequencyMilli: Long = 30000L - var expiredRecordsHoldDays: Int = 5 -} - -open class UpdateStateRequest : Serializable { - - lateinit var id: String - var group: String? = null - var state: String? = null -} - -data class CorrelationCheckResponse( - var message: String? = null, - var correlated: Boolean = false -) - -data class TypeCorrelationKey(val type: String, val correlationId: String) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt deleted file mode 100644 index dfe516953..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization - -interface MessagePrioritizationService { - - fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) - - fun getConfiguration(): PrioritizationConfiguration - - suspend fun prioritize(messagePrioritization: MessagePrioritization) - - /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ - suspend fun output(messages: List) - - /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ - suspend fun updateExpiredMessages() - - /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ - suspend fun cleanExpiredMessage() -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt deleted file mode 100644 index 2e5e6c617..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Date - -interface MessagePrioritizationStateService { - - suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization - - suspend fun getMessage(id: String): MessagePrioritization - - suspend fun getMessages(ids: List): List? - - suspend fun getExpiryEligibleMessages(count: Int): List? - - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): - List? - - suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): - List? - - suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? - - suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? - - suspend fun getCorrelatedMessages( - group: String, - states: List, - types: List?, - correlationIds: String - ): List? - - suspend fun updateMessagesState(ids: List, state: String) - - suspend fun updateMessageState(id: String, state: String): MessagePrioritization - - suspend fun setMessageState(id: String, state: String) - - suspend fun setMessagesPriority(ids: List, priority: String) - - suspend fun setMessagesState(ids: List, state: String) - - suspend fun setMessageStateANdError(id: String, state: String, error: String) - - suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) - - suspend fun deleteMessage(id: String) - - suspend fun deleteMessages(id: List) - - suspend fun deleteExpiredMessage(retentionDays: Int) - - suspend fun deleteMessageByGroup(group: String) - - suspend fun deleteMessageStates(group: String, states: List) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt deleted file mode 100644 index d8e71d413..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -/** - * Register the MessagePrioritizationStateService and exposed dependency - */ -fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = - instance(MessagePrioritizationStateService::class) - -/** - * Expose messagePrioritizationStateService to AbstractComponentFunction - */ -fun AbstractComponentFunction.messagePrioritizationStateService() = - BluePrintDependencyService.messagePrioritizationStateService() - -/** - * MessagePrioritization correlation extensions - */ - -/** - * Arrange comma separated correlation keys in ascending order. - */ -fun MessagePrioritization.toFormatedCorrelation(): String { - return this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") -} - -/** - * Used to group the correlation with respect to types. - */ -fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { - return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) -} - -/** get list of message ids **/ -fun List.ids(): List { - return this.map { it.id } -} - -/** Ordered by highest priority and updated date **/ -fun List.orderByHighestPriority(): List { - return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate)) -} - -/** Ordered by Updated date **/ -fun List.orderByUpdatedDate(): List { - return this.sortedWith(compareBy(MessagePrioritization::updatedDate)) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt deleted file mode 100644 index c7aab03b6..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope -import org.springframework.http.MediaType -import org.springframework.web.bind.annotation.GetMapping -import org.springframework.web.bind.annotation.PathVariable -import org.springframework.web.bind.annotation.PostMapping -import org.springframework.web.bind.annotation.RequestBody -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.ResponseBody -import org.springframework.web.bind.annotation.RestController - -@RestController -@RequestMapping(value = ["/api/v1/message-prioritization"]) -open class MessagePrioritizationApi( - private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService -) { - - @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ResponseBody - suspend fun ping(): String = mdcWebCoroutineScope { "Success" } - - @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ResponseBody - suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope { - messagePrioritizationStateService.getMessage(id) - } - - @PostMapping( - path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - @ResponseBody - suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = - mdcWebCoroutineScope { - messagePrioritizationStateService.saveMessage(messagePrioritization) - } - - @PostMapping( - path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - @ResponseBody - suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope { - messagePrioritizationService.prioritize(messagePrioritization) - } - - @PostMapping( - path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = - mdcWebCoroutineScope { - messagePrioritizationStateService.setMessageState( - updateMessageState.id, - updateMessageState.state!! - ) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt deleted file mode 100644 index ce2085f68..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import com.fasterxml.jackson.annotation.JsonFormat -import org.hibernate.annotations.Proxy -import org.springframework.data.annotation.LastModifiedDate -import org.springframework.data.jpa.domain.support.AuditingEntityListener -import org.springframework.data.jpa.repository.config.EnableJpaAuditing -import java.util.Date -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.EntityListeners -import javax.persistence.Id -import javax.persistence.Lob -import javax.persistence.Table -import javax.persistence.Temporal -import javax.persistence.TemporalType - -@EnableJpaAuditing -@EntityListeners(AuditingEntityListener::class) -@Entity -@Table(name = "MESSAGE_PRIORITIZATION") -@Proxy(lazy = false) -open class MessagePrioritization { - - @Id - @Column(name = "message_id", length = 50) - lateinit var id: String - - @Column(name = "message_group", length = 50, nullable = false) - lateinit var group: String - - @Column(name = "message_type", length = 50, nullable = false) - lateinit var type: String - - /** States Defined by MessageState */ - @Column(name = "message_state", length = 20, nullable = false) - lateinit var state: String - - @Column(name = "priority", nullable = false) - var priority: Int = 5 - - @Lob - @Column(name = "message", nullable = false) - var message: String? = null - - @Lob - @Column(name = "error", nullable = true) - var error: String? = null - - @Lob - @Column(name = "aggregated_message_ids", nullable = true) - var aggregatedMessageIds: String? = null - - @Lob - @Column(name = "correlation_id", nullable = true) - var correlationId: String? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "created_date", nullable = false) - var createdDate = Date() - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @LastModifiedDate - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "updated_date", nullable = false) - var updatedDate: Date? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "expiry_date", nullable = false) - var expiryDate: Date? = null -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt deleted file mode 100644 index 0b35e3856..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import org.springframework.data.domain.Pageable -import org.springframework.data.jpa.repository.JpaRepository -import org.springframework.data.jpa.repository.Modifying -import org.springframework.data.jpa.repository.Query -import org.springframework.stereotype.Repository -import org.springframework.transaction.annotation.Transactional -import java.util.Date - -@Repository -@Transactional(readOnly = true) -interface PrioritizationMessageRepository : JpaRepository { - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") - fun findByGroup(group: String, count: Pageable): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.updatedDate asc" - ) - fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable): - List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateInAndNotExpiredDate( - group: String, - states: List, - expiryCheckDate: Date, - count: Pageable - ): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByStateInAndExpiredDate( - states: List, - expiryCheckDate: Date, - count: Pageable - ): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateInAndExpiredDate( - group: String, - states: List, - expiryCheckDate: Date, - count: Pageable - ): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" - ) - fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String): - List? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" - ) - fun findByGroupAndTypesAndCorrelationId( - group: String, - states: List, - types: List, - correlationId: String - ): List? - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setStateForMessageId(id: String, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id IN :ids" - ) - fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id IN :ids" - ) - fun setPriorityForMessageIds(ids: List, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, " + - "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" - ) - fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE id IN :ids") - fun deleteByIds(ids: List) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ") - fun deleteByExpiryDate(expiryCheckDate: Date) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE group = :group") - fun deleteGroup(group: String) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states") - fun deleteGroupAndStateIn(group: String, states: List) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt deleted file mode 100644 index 112a80379..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -abstract class AbstractKafkaMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - private val log = logger(AbstractKafkaMessagePrioritizationService::class) - - lateinit var processorContext: ProcessorContext - - fun setKafkaProcessorContext(processorContext: ProcessorContext) { - this.processorContext = processorContext - } - - override suspend fun output(messages: List) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } - check(::processorContext.isInitialized) { "failed to initialize kafka processor " } - - messages.forEach { message -> - val updatedMessage = - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - processorContext.forward( - updatedMessage.id, - updatedMessage, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - - override suspend fun updateExpiredMessages() { - checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } - check(::processorContext.isInitialized) { "failed to initialize kafka processor " } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expiredMessage -> - expiredMessage.state = MessageState.EXPIRED.name - processorContext.forward( - expiredMessage.id, expiredMessage, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt deleted file mode 100644 index d4f8470c8..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor - -/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ -abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { - - override fun init(processorContext: ProcessorContext) { - this.processorContext = processorContext - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt deleted file mode 100644 index 1b0612492..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils - -open class DefaultMessagePrioritizeProcessor( - private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val kafkaMessagePrioritizationService: MessagePrioritizationService -) : AbstractMessagePrioritizeProcessor() { - - private val log = logger(DefaultMessagePrioritizeProcessor::class) - - override suspend fun processNB(key: ByteArray, value: ByteArray) { - - val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - try { - kafkaMessagePrioritizationService.prioritize(messagePrioritize) - } catch (e: Exception) { - messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" - log.error(messagePrioritize.error) - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!! - ) - /** Publish to Output topic */ - this.processorContext.forward( - messagePrioritize.id, messagePrioritize, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - - override fun init(context: ProcessorContext) { - super.init(context) - /** Set Configuration and Processor Context to messagePrioritizationService */ - if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { - kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) - } else { - throw BluePrintProcessorException( - "messagePrioritizationService is not instance of " + - "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" - ) - } - } - - override fun close() { - log.info( - "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt deleted file mode 100644 index 4ab399f54..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList - -open class KafkaMessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, - private val kafkaMessagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(KafkaMessagePrioritizationConsumer::class) - - private lateinit var streamingConsumerService: BlueprintMessageConsumerService - - open fun consumerService(selector: String): BlueprintMessageConsumerService { - return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) - } - - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - kafkaConsumerConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - // Output will be sent to the group-output topic from Processor API - return topology - } - } - } - - suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) - - // Dynamic Consumer Function to create Topology - val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) - streamingConsumerService.consume(null, consumerFunction) - } - - suspend fun shutDown() { - if (::streamingConsumerService.isInitialized) { - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt deleted file mode 100644 index 5595863d4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Deserializer -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.common.serialization.Serializer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.nio.charset.Charset - -open class MessagePrioritizationSerde : Serde { - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - - override fun deserializer(): Deserializer { - return object : Deserializer { - override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { - return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - } - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - } - } - - override fun serializer(): Serializer { - return object : Serializer { - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { - return data.asJsonString().toByteArray(Charset.defaultCharset()) - } - - override fun close() { - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt deleted file mode 100644 index 502a7822d..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -abstract class AbstractNatsMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - private val log = logger(AbstractNatsMessagePrioritizationService::class) - - lateinit var bluePrintNatsService: BluePrintNatsService - - override suspend fun output(messages: List) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } - check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } - - val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject - messages.forEach { message -> - val updatedMessage = - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - - /** send to the output subject */ - bluePrintNatsService.publish( - NatsClusterUtils.currentApplicationSubject(outputSubject), - updatedMessage.asJsonType().asByteArray() - ) - } - } - - override suspend fun updateExpiredMessages() { - checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } - check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (!expiredIds.isNullOrEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expiredMessage -> - expiredMessage.state = MessageState.EXPIRED.name - /** send to the output subject */ - bluePrintNatsService.publish( - NatsClusterUtils.currentApplicationSubject(outputSubject), - expiredMessage.asJsonType().asByteArray() - ) - } - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt deleted file mode 100644 index a0b2cf462..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats - -import io.nats.streaming.MessageHandler -import io.nats.streaming.Subscription -import kotlinx.coroutines.runBlocking -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asType -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils - -open class NatsMessagePrioritizationConsumer( - private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, - private val natsMessagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(NatsMessagePrioritizationConsumer::class) - - lateinit var bluePrintNatsService: BluePrintNatsService - private lateinit var subscription: Subscription - - suspend fun startConsuming() { - val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() - val natsConfiguration = prioritizationConfiguration.natsConfiguration - ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") - - check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { - "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." - } - bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) - natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService - val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) - val loadBalanceGroup = ClusterUtils.applicationName() - val messageHandler = createMessageHandler() - val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) - subscription = bluePrintNatsService.loadBalanceSubscribe( - inputSubject, - loadBalanceGroup, - messageHandler, - subscriptionOptions - ) - log.info( - "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." - ) - } - - suspend fun shutDown() { - if (::subscription.isInitialized) { - subscription.unsubscribe() - } - log.info("Nats prioritization consumer listener shutdown complete") - } - - private fun consumerService(selector: String): BluePrintNatsService { - return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) - } - - private fun createMessageHandler(): MessageHandler { - return MessageHandler { message -> - try { - val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) - runBlocking { - natsMessagePrioritizationService.prioritize(messagePrioritization) - } - } catch (e: Exception) { - log.error("failed to process prioritize message", e) - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt deleted file mode 100644 index f4602a810..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ -abstract class AbstractMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : MessagePrioritizationService { - - private val log = logger(AbstractMessagePrioritizationService::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - - override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { - this.prioritizationConfiguration = prioritizationConfiguration - } - - override fun getConfiguration(): PrioritizationConfiguration { - return this.prioritizationConfiguration - } - - override suspend fun prioritize(messagePrioritize: MessagePrioritization) { - try { - log.info("***** received in prioritize processor key(${messagePrioritize.id})") - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - /** Get the cluster lock for message group */ - val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) - // Save the Message - messagePrioritizationStateService.saveMessage(messagePrioritize) - handleCorrelationAndNextStep(messagePrioritize) - /** Cluster unLock for message group */ - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } catch (e: Exception) { - messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" - log.error(messagePrioritize.error) - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!! - ) - } - } - - override suspend fun output(messages: List) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - messages.forEach { message -> - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - } - } - - override suspend fun updateExpiredMessages() { - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (!expiredIds.isNullOrEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } - - override suspend fun cleanExpiredMessage() { - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - val clusterLock = MessageProcessorUtils.prioritizationCleanLock() - try { - messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) - } catch (e: Exception) { - log.error("failed in clean expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } - - open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { - /** Check correlation enabled and correlation field has populated */ - if (!messagePrioritization.correlationId.isNullOrBlank()) { - val id = messagePrioritization.id - val group = messagePrioritization.group - val correlationId = messagePrioritization.correlationId!! - val types = getGroupCorrelationTypes(messagePrioritization) - log.info( - "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " + - "correlation types($types), priority(${messagePrioritization.priority}), " + - "correlation id($correlationId)" - ) - - /** Get all previously received messages from database for group and optional types and correlation Id */ - val waitingCorrelatedStoreMessages = messagePrioritizationStateService - .getCorrelatedMessages( - group, - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId - ) - - /** If multiple records found, then check correlation */ - if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { - /** Check all correlation satisfies */ - val correlationResults = MessageCorrelationUtils - .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) - - if (correlationResults.correlated) { - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState( - waitingCorrelatedStoreMessages.ids(), - MessageState.PRIORITIZED.name - ) - /** Correlation satisfied, Send only correlated messages to aggregate processor */ - aggregate(waitingCorrelatedStoreMessages) - } else { - /** Correlation not satisfied */ - log.trace("correlation not matched : ${correlationResults.message}") - // Update the Message state to Wait - messagePrioritizationStateService.setMessagesState( - waitingCorrelatedStoreMessages.ids(), - MessageState.WAIT.name - ) - } - } else { - /** received first message of group and correlation Id, update the message with wait state */ - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) - } - } else { - /** No Correlation check needed, simply forward to next processor. */ - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - aggregate(arrayListOf(messagePrioritization)) - } - } - - open suspend fun aggregate(messages: List) { - log.info("@@@@@ received in aggregation processor ids(${messages.ids()}") - if (!messages.isNullOrEmpty()) { - try { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(messages) - } catch (e: Exception) { - val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" - if (!messages.isNullOrEmpty()) { - messages.forEach { messagePrioritization -> - try { - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritization.id, - MessageState.ERROR.name, error - ) - } catch (sendException: Exception) { - log.error( - "failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", - e - ) - } - } - /** Publish to output topic */ - output(messages) - } - } - } - } - - /** Child will override this implementation , if necessary - * Here the place child has to implement custom Sequencing and Aggregation logic. - * */ - abstract suspend fun handleAggregation(messages: List) - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt deleted file mode 100644 index 529d773a4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.springframework.stereotype.Service - -@Service -open class MessagePrioritizationSchedulerService( - private val messagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(MessagePrioritizationSchedulerService::class) - - @Volatile - var keepGoing = true - - /** This is sample scheduler implementation used during starting application with configuration. - @EventListener(ApplicationReadyEvent::class) - open fun init() = runBlocking { - log.info("Starting PrioritizationListeners...") - startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) - } - */ - - open suspend fun startScheduling() { - val prioritizationConfiguration = messagePrioritizationService.getConfiguration() - - log.info("Starting Prioritization Scheduler Service...") - GlobalScope.launch { - expiryScheduler(prioritizationConfiguration) - } - GlobalScope.launch { - cleanUpScheduler(prioritizationConfiguration) - } - } - - open suspend fun shutdownScheduling() { - keepGoing = false - val prioritizationConfiguration = messagePrioritizationService.getConfiguration() - delay(prioritizationConfiguration.shutDownConfiguration.waitMill) - } - - private suspend fun expiryScheduler( - prioritizationConfiguration: PrioritizationConfiguration - ) { - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec") - withContext(Dispatchers.Default) { - while (keepGoing) { - try { - messagePrioritizationService.updateExpiredMessages() - delay(expiryConfiguration.frequencyMilli) - } catch (e: Exception) { - log.error("failed in prioritization expiry scheduler", e) - } - } - } - } - - private suspend fun cleanUpScheduler( - prioritizationConfiguration: PrioritizationConfiguration - ) { - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec") - withContext(Dispatchers.Default) { - while (keepGoing) { - try { - messagePrioritizationService.cleanExpiredMessage() - delay(cleanConfiguration.frequencyMilli) - } catch (e: Exception) { - log.error("failed in prioritization clean scheduler", e) - } - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt deleted file mode 100644 index ed16fd44f..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate -import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate -import org.springframework.data.domain.PageRequest -import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import java.util.Date - -@Service -open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository -) : MessagePrioritizationStateService { - - private val log = logger(MessagePrioritizationStateServiceImpl::class) - - @Transactional - override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { - if (!message.correlationId.isNullOrBlank()) { - message.correlationId = message.toFormatedCorrelation() - } - message.updatedDate = Date() - return prioritizationMessageRepository.save(message) - } - - override suspend fun getMessage(id: String): MessagePrioritization { - return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BluePrintProcessorException("couldn't find message for id($id)") - } - - override suspend fun getMessages(ids: List): List? { - return prioritizationMessageRepository.findAllById(ids) - } - - override suspend fun getExpiryEligibleMessages(count: Int): List? { - return prioritizationMessageRepository - .findByStateInAndExpiredDate( - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), - Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): - List? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): - List? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? { - return prioritizationMessageRepository.findByExpiredDate( - expiryDate, PageRequest.of(0, count) - ) - } - - override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): - List? { - return prioritizationMessageRepository.findByGroupAndExpiredDate( - group, - expiryDate, PageRequest.of(0, count) - ) - } - - override suspend fun getCorrelatedMessages( - group: String, - states: List, - types: List?, - correlationIds: String - ): List? { - return if (!types.isNullOrEmpty()) { - prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) - } else { - prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) - } - } - - @Transactional - override suspend fun updateMessagesState(ids: List, state: String) { - ids.forEach { - val updated = updateMessageState(it, state) - log.info("message($it) update to state(${updated.state})") - } - } - - @Transactional - override suspend fun setMessageState(id: String, state: String) { - prioritizationMessageRepository.setStateForMessageId(id, state, Date()) - } - - @Transactional - override suspend fun setMessagesPriority(ids: List, priority: String) { - prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) - } - - @Transactional - override suspend fun setMessagesState(ids: List, state: String) { - prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) - } - - @Transactional - override suspend fun setMessageStateANdError(id: String, state: String, error: String) { - prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) - } - - @Transactional - override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { - val updateMessage = getMessage(id).apply { - this.updatedDate = Date() - this.state = state - } - return saveMessage(updateMessage) - } - - @Transactional - override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { - val groupedIds = aggregatedIds.joinToString(",") - prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) - } - - override suspend fun deleteMessage(id: String) { - prioritizationMessageRepository.deleteById(id) - log.info("Prioritization Messages $id deleted successfully.") - } - - override suspend fun deleteMessages(ids: List) { - prioritizationMessageRepository.deleteByIds(ids) - log.info("Prioritization Messages $ids deleted successfully.") - } - - override suspend fun deleteExpiredMessage(retentionDays: Int) { - val expiryCheckDate = controllerDate().addDate(retentionDays) - prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate) - } - - override suspend fun deleteMessageByGroup(group: String) { - prioritizationMessageRepository.deleteGroup(group) - log.info("Prioritization Messages group($group) deleted successfully.") - } - - override suspend fun deleteMessageStates(group: String, states: List) { - prioritizationMessageRepository.deleteGroupAndStateIn(group, states) - log.info("Prioritization Messages group($group) with states($states) deleted successfully.") - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt deleted file mode 100644 index 305e64ba4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -/** Sample Prioritization Service, Define spring service injector to register in application*/ -open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -class SampleMessagePrioritizationHandler( - private val messagePrioritizationService: MessagePrioritizationService, - private val messagePrioritizationStateService: MessagePrioritizationStateService -) { - - private val log = logger(SampleMessagePrioritizationHandler::class) - - suspend fun handleAggregation(messages: List) { - log.info("messages(${messages.ids()}) aggregated") - /** Sequence based on Priority and Updated Date */ - val sequencedMessage = messages.orderByHighestPriority() - /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState( - sequencedMessage.ids(), - MessageState.AGGREGATED.name - ) - messagePrioritizationService.output(sequencedMessage) - } - - fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - return when (messagePrioritization.group) { - /** Dummy Implementation, This can also be read from file and stored as cached map **/ - "group-typed" -> arrayListOf("type-0", "type-1", "type-2") - "pass-typed" -> arrayListOf(messagePrioritization.type) - else -> null - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt deleted file mode 100644 index 7ab0be098..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException - -object MessageCorrelationUtils { - - /** Assumption is message is of same group **/ - fun correlatedMessages(collectedMessages: List): CorrelationCheckResponse { - val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") - if (collectedMessages.size > 1) { - val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } - if (filteredMessage.isNotEmpty()) { - val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } - if (groupedMessage.size == 1) { - correlationCheckResponse.correlated = true - correlationCheckResponse.message = null - } - } - } else { - correlationCheckResponse.message = "received only one message for that group" - } - return correlationCheckResponse - } - - /** Assumption is message is of same group and checking for required types **/ - fun correlatedMessagesWithTypes(collectedMessages: List, types: List?): - CorrelationCheckResponse { - - return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { - - val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } - if (!unknownMessageTypes.isNullOrEmpty()) { - throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") - } - - val copyTypes = types.toTypedArray().copyOf().toMutableList() - - val filteredMessage = collectedMessages.filter { - !it.correlationId.isNullOrBlank() && - types.contains(it.type) - } - var correlatedKeys: MutableSet = mutableSetOf() - if (filteredMessage.isNotEmpty()) { - val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } - val foundType = correlatedMap.keys.map { it.type } - copyTypes.removeAll(foundType) - correlatedKeys = correlatedMap.keys.map { - it.correlationId - }.toMutableSet() - } - /** Check if any Types missing and same correlation id for all types */ - return if (copyTypes.isEmpty()) { - if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) - else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") - } else { - CorrelationCheckResponse(message = "couldn't find types($copyTypes)") - } - } else { - return correlatedMessages(collectedMessages) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt deleted file mode 100644 index 2c4ae30da..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate -import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate -import java.util.Date -import java.util.UUID - -object MessagePrioritizationSample { - - fun samplePrioritizationConfiguration(): PrioritizationConfiguration { - return PrioritizationConfiguration().apply { - kafkaConfiguration = KafkaConfiguration().apply { - inputTopicSelector = "prioritize-input" - outputTopic = "prioritize-output-topic" - expiredTopic = "prioritize-expired-topic" - } - natsConfiguration = NatsConfiguration().apply { - connectionSelector = "cds-controller" - inputSubject = "prioritize-input" - outputSubject = "prioritize-output" - expiredSubject = "prioritize-expired" - } - expiryConfiguration = ExpiryConfiguration().apply { - frequencyMilli = 10000L - maxPollRecord = 2000 - } - shutDownConfiguration = ShutDownConfiguration().apply { - waitMill = 2000L - } - cleanConfiguration = CleanConfiguration().apply { - frequencyMilli = 10000L - expiredRecordsHoldDays = 5 - } - } - } - - fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration { - return PrioritizationConfiguration().apply { - expiryConfiguration = ExpiryConfiguration().apply { - frequencyMilli = 10L - maxPollRecord = 2000 - } - shutDownConfiguration = ShutDownConfiguration().apply { - waitMill = 20L - } - cleanConfiguration = CleanConfiguration().apply { - frequencyMilli = 10L - expiredRecordsHoldDays = 5 - } - } - } - - private fun currentDatePlusDays(days: Int): Date { - return controllerDate().addDate(days) - } - - fun sampleMessages(messageState: String, count: Int): List { - return sampleMessages("sample-group", messageState, count) - } - - fun sampleMessages(groupName: String, messageState: String, count: Int): List { - val messages: MutableList = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, - "sample-type", null - ) - messages.add(backPressureMessage) - } - return messages - } - - fun sampleMessageWithSameCorrelation( - groupName: String, - messageState: String, - count: Int - ): List { - val messages: MutableList = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, "sample-type", - "key1=value1,key2=value2" - ) - messages.add(backPressureMessage) - } - return messages - } - - fun sampleMessageWithDifferentTypeSameCorrelation( - groupName: String, - messageState: String, - count: Int - ): List { - val messages: MutableList = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, "type-$it", - "key1=value1,key2=value2" - ) - messages.add(backPressureMessage) - } - return messages - } - - fun createMessage( - groupName: String, - messageState: String, - messageType: String, - messageCorrelationId: String? - ): MessagePrioritization { - - return MessagePrioritization().apply { - id = UUID.randomUUID().toString() - group = groupName - type = messageType - state = messageState - priority = (1..10).shuffled().first() - correlationId = messageCorrelationId - message = "I am the Message" - createdDate = Date() - updatedDate = Date() - expiryDate = currentDatePlusDays(3) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt deleted file mode 100644 index 86cec3697..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.apache.kafka.streams.processor.ProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -object MessageProcessorUtils { - - /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ - suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - - return if (clusterService != null && clusterService.clusterJoined() && - !messagePrioritization.correlationId.isNullOrBlank() - ) { - // Get the correlation key in ascending order, even it it is misplaced - val correlationId = messagePrioritization.toFormatedCorrelation() - val lockName = "prioritize::${messagePrioritization.group}::$correlationId" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility to create the cluster lock for expiry scheduler*/ - suspend fun prioritizationExpiryLock(): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritize-expiry" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility to create the cluster lock for expiry scheduler*/ - suspend fun prioritizationCleanLock(): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritize-clean" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility used to cluster unlock for message [clusterLock] */ - suspend fun prioritizationUnLock(clusterLock: ClusterLock?) { - if (clusterLock != null) { - clusterLock.unLock() - clusterLock.close() - } - } - - /** Get the Kafka Supplier for processor lookup [name] **/ - fun bluePrintProcessorSupplier(name: String): ProcessorSupplier { - return ProcessorSupplier { - // Dynamically resolve the Prioritization Processor - BluePrintDependencyService.instance>(name) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt deleted file mode 100644 index 286a9b5c1..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import io.mockk.coEvery -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.junit.Before -import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample -import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest -import org.springframework.context.ApplicationContext -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.SpringRunner -import kotlin.test.Test -import kotlin.test.assertNotNull - -@RunWith(SpringRunner::class) -@DataJpaTest -@DirtiesContext -@ContextConfiguration( - classes = [ - BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, - MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class - ] -) -@TestPropertySource( - properties = - [ - "spring.jpa.show-sql=false", - "spring.jpa.properties.hibernate.show_sql=false", - "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", - - "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", - "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", - "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", - - // To send initial test message - "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", - - "blueprintsprocessor.nats.cds-controller.type=token-auth", - "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", - "blueprintsprocessor.nats.cds-controller.token=tokenAuth" - ] -) -open class MessagePrioritizationConsumerTest { - - private val log = logger(MessagePrioritizationConsumerTest::class) - - @Autowired - lateinit var applicationContext: ApplicationContext - - @Autowired - lateinit var prioritizationMessageRepository: PrioritizationMessageRepository - - @Autowired - lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService - - @Autowired - lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService - - @Autowired - lateinit var messagePrioritizationStateService: MessagePrioritizationStateService - - @Before - fun setup() { - BluePrintDependencyService.inject(applicationContext) - } - - @Test - fun testBluePrintKafkaJDBCKeyStore() { - runBlocking { - assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") - - val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) - assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") - - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { - val message = messagePrioritizationService.saveMessage(it) - val repoResult = messagePrioritizationService.getMessage(message.id) - assertNotNull(repoResult, "failed to get inserted message.") - } - } - } - - @Test - fun testMessagePrioritizationService() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val messagePrioritizationService = - SampleMessagePrioritizationService(messagePrioritizationStateService) - messagePrioritizationService.setConfiguration(configuration) - - log.info("**************** without Correlation **************") - /** Checking without correlation */ - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - messagePrioritizationService.prioritize(it) - } - log.info("**************** Same Group , with Correlation **************") - /** checking same group with correlation */ - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(10) - messagePrioritizationService.prioritize(it) - } - log.info("**************** Different Type , with Correlation **************") - /** checking different type, with correlation */ - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(10) - messagePrioritizationService.prioritize(it) - } - } - } - - @Test - fun testStartConsuming() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - - val streamingConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector) - assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") - - val spyStreamingConsumerService = spyk(streamingConsumerService) - coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit - coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService, mockk() - ) - val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) - - // Test Topology - val kafkaStreamConsumerFunction = - spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) - val messageConsumerProperties = bluePrintMessageLibPropertyService - .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") - val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) - assertNotNull(topology, "failed to get create topology") - - every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService - spyMessagePrioritizationConsumer.startConsuming(configuration) - spyMessagePrioritizationConsumer.shutDown() - } - } - - @Test - fun testSchedulerService() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val messagePrioritizationService = - SampleMessagePrioritizationService(messagePrioritizationStateService) - messagePrioritizationService.setConfiguration(configuration) - - val messagePrioritizationSchedulerService = - MessagePrioritizationSchedulerService(messagePrioritizationService) - launch { - messagePrioritizationSchedulerService.startScheduling() - } - launch { - /** To debug increase the delay time */ - delay(20) - messagePrioritizationSchedulerService.shutdownScheduling() - } - } - } - - /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - // @Test - fun testKafkaMessagePrioritizationConsumer() { - runBlocking { - - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val kafkaMessagePrioritizationService = - SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) - kafkaMessagePrioritizationService.setConfiguration(configuration) - - val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( - messagePrioritizationStateService, - kafkaMessagePrioritizationService - ) - - // Register the processor - BluePrintDependencyService.registerSingleton( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - defaultMessagePrioritizeProcessor - ) - - val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService, - kafkaMessagePrioritizationService - ) - messagePrioritizationConsumer.startConsuming(configuration) - - /** Send sample message with every 1 sec */ - val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService - launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - delay(100) - val headers: MutableMap = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - val headers: MutableMap = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(2000) - val headers: MutableMap = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - } - delay(10000) - messagePrioritizationConsumer.shutDown() - } - } - - /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker - * Start : - * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V - * */ - // @Test - fun testNatsMessagePrioritizationConsumer() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") - - val inputSubject = - NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) - - val natsMessagePrioritizationService = - SampleNatsMessagePrioritizationService(messagePrioritizationStateService) - natsMessagePrioritizationService.setConfiguration(configuration) - - val messagePrioritizationConsumer = - NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) - messagePrioritizationConsumer.startConsuming() - - /** Send sample message with every 1 sec */ - val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService - - launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - delay(100) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(200) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - } - delay(3000) - messagePrioritizationConsumer.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt deleted file mode 100644 index 22c399608..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate -import org.springframework.stereotype.Service -import javax.sql.DataSource - -@Configuration -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) -@EnableAutoConfiguration -open class TestDatabaseConfiguration { - - @Bean("primaryDBLibGenericService") - open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { - return PrimaryDBLibGenericService( - NamedParameterJdbcTemplate(dataSource) - ) - } -} - -/* Sample Prioritization Listener, used during Application startup -@Component -open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { - - private val log = logger(SamplePrioritizationListeners::class) - - @EventListener(ApplicationReadyEvent::class) - open fun init() = runBlocking { - log.info("Starting PrioritizationListeners...") - defaultMessagePrioritizationConsumer - .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) - } - - @PreDestroy - open fun destroy() = runBlocking { - log.info("Shutting down PrioritizationListeners...") - defaultMessagePrioritizationConsumer.shutDown() - } -} - */ - -@Service -open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : - SampleMessagePrioritizationService(messagePrioritizationStateService) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt deleted file mode 100644 index 73d3738e5..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class MessageCorrelationUtilsTest { - - @Test - fun testCorrelationKeysReordered() { - - val message1 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2" - ) - val message2 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key2=value2,key1=value1" - ) - - val multipleMessages: MutableList = arrayListOf() - multipleMessages.add(message1) - multipleMessages.add(message2) - val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) - assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") - } - - @Test - fun differentTypesWithSameCorrelationMessages() { - /** With Types **/ - /* Assumption is Same group with different types */ - val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) - val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2") - ) - assertTrue( - differentTypesWithSameCorrelationMessagesResponse.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResponse" - ) - - /* Assumption is Same group with different types and one missing expected types, - In this case type-3 message is missing */ - val differentTypesWithSameCorrelationMessagesResWithMissingType = - MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2", "type-3") - ) - assertTrue( - !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" - ) - } - - @Test - fun withSameCorrelationMessagesWithIgnoredTypes() { - /** With ignoring Types */ - /** Assumption is only one message received */ - val withSameCorrelationOneMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) - val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationOneMessages, null - ) - assertTrue( - !withSameCorrelationOneMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp" - ) - - /** Assumption is two message received for same group with same correlation */ - val withSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) - val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationMessages, null - ) - assertTrue( - withSameCorrelationMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp" - ) - } - - @Test - fun differentTypesWithDifferentCorrelationMessage() { - /** Assumption is two message received for same group with different expected types and different correlation */ - val message1 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2" - ) - val message2 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-1", "key1=value1,key2=value3" - ) - val differentTypesWithDifferentCorrelationMessage: MutableList = arrayListOf() - differentTypesWithDifferentCorrelationMessage.add(message1) - differentTypesWithDifferentCorrelationMessage.add(message2) - val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithDifferentCorrelationMessage, - arrayListOf("type-0", "type-1") - ) - assertTrue( - !differentTypesWithDifferentCorrelationMessageResp.correlated, - "failed to correlate differentTypesWithDifferentCorrelationMessageResp" - ) - } - - @Test - fun testPrioritizationOrdering() { - val differentPriorityMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5) - val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority() - assertNotNull(orderedPriorityMessages, "failed to order the priority messages") - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml deleted file mode 100644 index e3a1f7a01..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - ${localPattern} - - - - - - - - - - - - - diff --git a/ms/blueprintsprocessor/functions/message-prioritization/README.md b/ms/blueprintsprocessor/functions/message-prioritization/README.md new file mode 100644 index 000000000..cda43faca --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/README.md @@ -0,0 +1,26 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning diff --git a/ms/blueprintsprocessor/functions/message-prioritization/pom.xml b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml new file mode 100644 index 000000000..9b6f3b1c3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml @@ -0,0 +1,43 @@ + + + + + 4.0.0 + + + org.onap.ccsdk.cds.blueprintsprocessor + blueprintsprocessor-functions + 1.1.0-SNAPSHOT + + + org.onap.ccsdk.cds.blueprintsprocessor.functions + message-prioritization + + MS Blueprints Processor Functions - Message Prioritization + Blueprints Processor Function - Message Prioritization + + + + org.onap.ccsdk.cds.blueprintsprocessor.modules + message-lib + + + com.h2database + h2 + + + diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt new file mode 100644 index 000000000..890e0a6ba --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -0,0 +1,33 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +open class MessagePrioritizationConfiguration + +object MessagePrioritizationConstants { + + const val SOURCE_INPUT = "source-prioritization-input" + + const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" + + const val SINK_OUTPUT = "sink-prioritization-output" +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt new file mode 100644 index 000000000..65b7644a8 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -0,0 +1,89 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import java.io.Serializable + +object MessageActionConstants { + + const val PRIORITIZE = "prioritize" +} + +enum class MessageState(val id: String) { + NEW("new"), + WAIT("wait"), + EXPIRED("expired"), + PRIORITIZED("prioritized"), + AGGREGATED("aggregated"), + COMPLETED("completed"), + ERROR("error") +} + +open class PrioritizationConfiguration : Serializable { + + lateinit var expiryConfiguration: ExpiryConfiguration + lateinit var shutDownConfiguration: ShutDownConfiguration + lateinit var cleanConfiguration: CleanConfiguration + var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration +} + +open class KafkaConfiguration : Serializable { + + lateinit var inputTopicSelector: String // Consumer Configuration Selector + lateinit var expiredTopic: String // Publish Configuration Selector + lateinit var outputTopic: String // Publish Configuration Selector +} + +open class NatsConfiguration : Serializable { + + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + +open class ExpiryConfiguration : Serializable { + + var frequencyMilli: Long = 30000L + var maxPollRecord: Int = 1000 +} + +open class ShutDownConfiguration : Serializable { + + var waitMill: Long = 30000L +} + +open class CleanConfiguration : Serializable { + + var frequencyMilli: Long = 30000L + var expiredRecordsHoldDays: Int = 5 +} + +open class UpdateStateRequest : Serializable { + + lateinit var id: String + var group: String? = null + var state: String? = null +} + +data class CorrelationCheckResponse( + var message: String? = null, + var correlated: Boolean = false +) + +data class TypeCorrelationKey(val type: String, val correlationId: String) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt new file mode 100644 index 000000000..dfe516953 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization + +interface MessagePrioritizationService { + + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration + + suspend fun prioritize(messagePrioritization: MessagePrioritization) + + /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ + suspend fun output(messages: List) + + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() + + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..2e5e6c617 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt @@ -0,0 +1,72 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.Date + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getMessages(ids: List): List? + + suspend fun getExpiryEligibleMessages(count: Int): List? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? + + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? + + suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? + + suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? + + suspend fun updateMessagesState(ids: List, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesPriority(ids: List, priority: String) + + suspend fun setMessagesState(ids: List, state: String) + + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessages(id: List) + + suspend fun deleteExpiredMessage(retentionDays: Int) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt new file mode 100644 index 000000000..d8e71d413 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -0,0 +1,67 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** + * Register the MessagePrioritizationStateService and exposed dependency + */ +fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = + instance(MessagePrioritizationStateService::class) + +/** + * Expose messagePrioritizationStateService to AbstractComponentFunction + */ +fun AbstractComponentFunction.messagePrioritizationStateService() = + BluePrintDependencyService.messagePrioritizationStateService() + +/** + * MessagePrioritization correlation extensions + */ + +/** + * Arrange comma separated correlation keys in ascending order. + */ +fun MessagePrioritization.toFormatedCorrelation(): String { + return this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") +} + +/** + * Used to group the correlation with respect to types. + */ +fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { + return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) +} + +/** get list of message ids **/ +fun List.ids(): List { + return this.map { it.id } +} + +/** Ordered by highest priority and updated date **/ +fun List.orderByHighestPriority(): List { + return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate)) +} + +/** Ordered by Updated date **/ +fun List.orderByUpdatedDate(): List { + return this.sortedWith(compareBy(MessagePrioritization::updatedDate)) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt new file mode 100644 index 000000000..c7aab03b6 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope +import org.springframework.http.MediaType +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.ResponseBody +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping(value = ["/api/v1/message-prioritization"]) +open class MessagePrioritizationApi( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) { + + @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + suspend fun ping(): String = mdcWebCoroutineScope { "Success" } + + @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope { + messagePrioritizationStateService.getMessage(id) + } + + @PostMapping( + path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = + mdcWebCoroutineScope { + messagePrioritizationStateService.saveMessage(messagePrioritization) + } + + @PostMapping( + path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope { + messagePrioritizationService.prioritize(messagePrioritization) + } + + @PostMapping( + path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = + mdcWebCoroutineScope { + messagePrioritizationStateService.setMessageState( + updateMessageState.id, + updateMessageState.state!! + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt new file mode 100644 index 000000000..ce2085f68 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt @@ -0,0 +1,89 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import com.fasterxml.jackson.annotation.JsonFormat +import org.hibernate.annotations.Proxy +import org.springframework.data.annotation.LastModifiedDate +import org.springframework.data.jpa.domain.support.AuditingEntityListener +import org.springframework.data.jpa.repository.config.EnableJpaAuditing +import java.util.Date +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.EntityListeners +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.Table +import javax.persistence.Temporal +import javax.persistence.TemporalType + +@EnableJpaAuditing +@EntityListeners(AuditingEntityListener::class) +@Entity +@Table(name = "MESSAGE_PRIORITIZATION") +@Proxy(lazy = false) +open class MessagePrioritization { + + @Id + @Column(name = "message_id", length = 50) + lateinit var id: String + + @Column(name = "message_group", length = 50, nullable = false) + lateinit var group: String + + @Column(name = "message_type", length = 50, nullable = false) + lateinit var type: String + + /** States Defined by MessageState */ + @Column(name = "message_state", length = 20, nullable = false) + lateinit var state: String + + @Column(name = "priority", nullable = false) + var priority: Int = 5 + + @Lob + @Column(name = "message", nullable = false) + var message: String? = null + + @Lob + @Column(name = "error", nullable = true) + var error: String? = null + + @Lob + @Column(name = "aggregated_message_ids", nullable = true) + var aggregatedMessageIds: String? = null + + @Lob + @Column(name = "correlation_id", nullable = true) + var correlationId: String? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "created_date", nullable = false) + var createdDate = Date() + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @LastModifiedDate + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updated_date", nullable = false) + var updatedDate: Date? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "expiry_date", nullable = false) + var expiryDate: Date? = null +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt new file mode 100644 index 000000000..0b35e3856 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt @@ -0,0 +1,175 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc" + ) + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndNotExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByStateInAndExpiredDate( + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndTypesAndCorrelationId( + group: String, + states: List, + types: List, + correlationId: String + ): List? + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateForMessageId(id: String, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setPriorityForMessageIds(ids: List, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" + ) + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE id IN :ids") + fun deleteByIds(ids: List) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ") + fun deleteByExpiryDate(expiryCheckDate: Date) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states") + fun deleteGroupAndStateIn(group: String, states: List) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..d4f8470c8 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor + +/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { + + override fun init(processorContext: ProcessorContext) { + this.processorContext = processorContext + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..1b0612492 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -0,0 +1,78 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils + +open class DefaultMessagePrioritizeProcessor( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) : AbstractMessagePrioritizeProcessor() { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + + val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + try { + kafkaMessagePrioritizationService.prioritize(messagePrioritize) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + /** Publish to Output topic */ + this.processorContext.forward( + messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** Set Configuration and Processor Context to messagePrioritizationService */ + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } + } + + override fun close() { + log.info( + "closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..4ab399f54 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(KafkaMessagePrioritizationConsumer::class) + + private lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + kafkaConsumerConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (::streamingConsumerService.isInitialized) { + streamingConsumerService.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt new file mode 100644 index 000000000..5595863d4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..a0b2cf462 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,92 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt new file mode 100644 index 000000000..f4602a810 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -0,0 +1,203 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ +abstract class AbstractMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : MessagePrioritizationService { + + private val log = logger(AbstractMessagePrioritizationService::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration + } + + override suspend fun prioritize(messagePrioritize: MessagePrioritization) { + try { + log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + /** Get the cluster lock for message group */ + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) + // Save the Message + messagePrioritizationStateService.saveMessage(messagePrioritize) + handleCorrelationAndNextStep(messagePrioritize) + /** Cluster unLock for message group */ + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + } + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + messages.forEach { message -> + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + } + } + + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } + + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + val clusterLock = MessageProcessorUtils.prioritizationCleanLock() + try { + messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) + } catch (e: Exception) { + log.error("failed in clean expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } + + open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { + /** Check correlation enabled and correlation field has populated */ + if (!messagePrioritization.correlationId.isNullOrBlank()) { + val id = messagePrioritization.id + val group = messagePrioritization.group + val correlationId = messagePrioritization.correlationId!! + val types = getGroupCorrelationTypes(messagePrioritization) + log.info( + "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " + + "correlation types($types), priority(${messagePrioritization.priority}), " + + "correlation id($correlationId)" + ) + + /** Get all previously received messages from database for group and optional types and correlation Id */ + val waitingCorrelatedStoreMessages = messagePrioritizationStateService + .getCorrelatedMessages( + group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId + ) + + /** If multiple records found, then check correlation */ + if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { + /** Check all correlation satisfies */ + val correlationResults = MessageCorrelationUtils + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + + if (correlationResults.correlated) { + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.PRIORITIZED.name + ) + /** Correlation satisfied, Send only correlated messages to aggregate processor */ + aggregate(waitingCorrelatedStoreMessages) + } else { + /** Correlation not satisfied */ + log.trace("correlation not matched : ${correlationResults.message}") + // Update the Message state to Wait + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.WAIT.name + ) + } + } else { + /** received first message of group and correlation Id, update the message with wait state */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) + } + } else { + /** No Correlation check needed, simply forward to next processor. */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) + aggregate(arrayListOf(messagePrioritization)) + } + } + + open suspend fun aggregate(messages: List) { + log.info("@@@@@ received in aggregation processor ids(${messages.ids()}") + if (!messages.isNullOrEmpty()) { + try { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(messages) + } catch (e: Exception) { + val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" + if (!messages.isNullOrEmpty()) { + messages.forEach { messagePrioritization -> + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) + } catch (sendException: Exception) { + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", + e + ) + } + } + /** Publish to output topic */ + output(messages) + } + } + } + } + + /** Child will override this implementation , if necessary + * Here the place child has to implement custom Sequencing and Aggregation logic. + * */ + abstract suspend fun handleAggregation(messages: List) + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt new file mode 100644 index 000000000..529d773a4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service + +@Service +open class MessagePrioritizationSchedulerService( + private val messagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(MessagePrioritizationSchedulerService::class) + + @Volatile + var keepGoing = true + + /** This is sample scheduler implementation used during starting application with configuration. + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + */ + + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + + log.info("Starting Prioritization Scheduler Service...") + GlobalScope.launch { + expiryScheduler(prioritizationConfiguration) + } + GlobalScope.launch { + cleanUpScheduler(prioritizationConfiguration) + } + } + + open suspend fun shutdownScheduling() { + keepGoing = false + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + delay(prioritizationConfiguration.shutDownConfiguration.waitMill) + } + + private suspend fun expiryScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.updateExpiredMessages() + delay(expiryConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization expiry scheduler", e) + } + } + } + } + + private suspend fun cleanUpScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.cleanExpiredMessage() + delay(cleanConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization clean scheduler", e) + } + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt new file mode 100644 index 000000000..ed16fd44f --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -0,0 +1,176 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate +import org.springframework.data.domain.PageRequest +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Service +open class MessagePrioritizationStateServiceImpl( + private val prioritizationMessageRepository: PrioritizationMessageRepository +) : MessagePrioritizationStateService { + + private val log = logger(MessagePrioritizationStateServiceImpl::class) + + @Transactional + override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { + if (!message.correlationId.isNullOrBlank()) { + message.correlationId = message.toFormatedCorrelation() + } + message.updatedDate = Date() + return prioritizationMessageRepository.save(message) + } + + override suspend fun getMessage(id: String): MessagePrioritization { + return prioritizationMessageRepository.findById(id).orElseGet(null) + ?: throw BluePrintProcessorException("couldn't find message for id($id)") + } + + override suspend fun getMessages(ids: List): List? { + return prioritizationMessageRepository.findAllById(ids) + } + + override suspend fun getExpiryEligibleMessages(count: Int): List? { + return prioritizationMessageRepository + .findByStateInAndExpiredDate( + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? { + return prioritizationMessageRepository.findByExpiredDate( + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? { + return if (!types.isNullOrEmpty()) { + prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) + } else { + prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) + } + } + + @Transactional + override suspend fun updateMessagesState(ids: List, state: String) { + ids.forEach { + val updated = updateMessageState(it, state) + log.info("message($it) update to state(${updated.state})") + } + } + + @Transactional + override suspend fun setMessageState(id: String, state: String) { + prioritizationMessageRepository.setStateForMessageId(id, state, Date()) + } + + @Transactional + override suspend fun setMessagesPriority(ids: List, priority: String) { + prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) + } + + @Transactional + override suspend fun setMessagesState(ids: List, state: String) { + prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) + } + + @Transactional + override suspend fun setMessageStateANdError(id: String, state: String, error: String) { + prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) + } + + @Transactional + override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + } + return saveMessage(updateMessage) + } + + @Transactional + override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { + val groupedIds = aggregatedIds.joinToString(",") + prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) + } + + override suspend fun deleteMessage(id: String) { + prioritizationMessageRepository.deleteById(id) + log.info("Prioritization Messages $id deleted successfully.") + } + + override suspend fun deleteMessages(ids: List) { + prioritizationMessageRepository.deleteByIds(ids) + log.info("Prioritization Messages $ids deleted successfully.") + } + + override suspend fun deleteExpiredMessage(retentionDays: Int) { + val expiryCheckDate = controllerDate().addDate(retentionDays) + prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate) + } + + override suspend fun deleteMessageByGroup(group: String) { + prioritizationMessageRepository.deleteGroup(group) + log.info("Prioritization Messages group($group) deleted successfully.") + } + + override suspend fun deleteMessageStates(group: String, states: List) { + prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + log.info("Prioritization Messages group($group) with states($states) deleted successfully.") + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt new file mode 100644 index 000000000..305e64ba4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -0,0 +1,120 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** Sample Prioritization Service, Define spring service injector to register in application*/ +open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List) { + log.info("messages(${messages.ids()}) aggregated") + /** Sequence based on Priority and Updated Date */ + val sequencedMessage = messages.orderByHighestPriority() + /** Update all messages to aggregated state */ + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) + } + + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + return when (messagePrioritization.group) { + /** Dummy Implementation, This can also be read from file and stored as cached map **/ + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) + else -> null + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt new file mode 100644 index 000000000..7ab0be098 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +object MessageCorrelationUtils { + + /** Assumption is message is of same group **/ + fun correlatedMessages(collectedMessages: List): CorrelationCheckResponse { + val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") + if (collectedMessages.size > 1) { + val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } + if (filteredMessage.isNotEmpty()) { + val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } + if (groupedMessage.size == 1) { + correlationCheckResponse.correlated = true + correlationCheckResponse.message = null + } + } + } else { + correlationCheckResponse.message = "received only one message for that group" + } + return correlationCheckResponse + } + + /** Assumption is message is of same group and checking for required types **/ + fun correlatedMessagesWithTypes(collectedMessages: List, types: List?): + CorrelationCheckResponse { + + return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { + + val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } + if (!unknownMessageTypes.isNullOrEmpty()) { + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + } + + val copyTypes = types.toTypedArray().copyOf().toMutableList() + + val filteredMessage = collectedMessages.filter { + !it.correlationId.isNullOrBlank() && + types.contains(it.type) + } + var correlatedKeys: MutableSet = mutableSetOf() + if (filteredMessage.isNotEmpty()) { + val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } + val foundType = correlatedMap.keys.map { it.type } + copyTypes.removeAll(foundType) + correlatedKeys = correlatedMap.keys.map { + it.correlationId + }.toMutableSet() + } + /** Check if any Types missing and same correlation id for all types */ + return if (copyTypes.isEmpty()) { + if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) + else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } else { + CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + } + } else { + return correlatedMessages(collectedMessages) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt new file mode 100644 index 000000000..2c4ae30da --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -0,0 +1,148 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate +import java.util.Date +import java.util.UUID + +object MessagePrioritizationSample { + + fun samplePrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + kafkaConfiguration = KafkaConfiguration().apply { + inputTopicSelector = "prioritize-input" + outputTopic = "prioritize-output-topic" + expiredTopic = "prioritize-expired-topic" + } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10000L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 2000L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10000L + expiredRecordsHoldDays = 5 + } + } + } + + fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 20L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10L + expiredRecordsHoldDays = 5 + } + } + } + + private fun currentDatePlusDays(days: Int): Date { + return controllerDate().addDate(days) + } + + fun sampleMessages(messageState: String, count: Int): List { + return sampleMessages("sample-group", messageState, count) + } + + fun sampleMessages(groupName: String, messageState: String, count: Int): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, + "sample-type", null + ) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, "sample-type", + "key1=value1,key2=value2" + ) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithDifferentTypeSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, "type-$it", + "key1=value1,key2=value2" + ) + messages.add(backPressureMessage) + } + return messages + } + + fun createMessage( + groupName: String, + messageState: String, + messageType: String, + messageCorrelationId: String? + ): MessagePrioritization { + + return MessagePrioritization().apply { + id = UUID.randomUUID().toString() + group = groupName + type = messageType + state = messageState + priority = (1..10).shuffled().first() + correlationId = messageCorrelationId + message = "I am the Message" + createdDate = Date() + updatedDate = Date() + expiryDate = currentDatePlusDays(3) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt new file mode 100644 index 000000000..86cec3697 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -0,0 +1,86 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +object MessageProcessorUtils { + + /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ + suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + + return if (clusterService != null && clusterService.clusterJoined() && + !messagePrioritization.correlationId.isNullOrBlank() + ) { + // Get the correlation key in ascending order, even it it is misplaced + val correlationId = messagePrioritization.toFormatedCorrelation() + val lockName = "prioritize::${messagePrioritization.group}::$correlationId" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationExpiryLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-expiry" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationCleanLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-clean" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility used to cluster unlock for message [clusterLock] */ + suspend fun prioritizationUnLock(clusterLock: ClusterLock?) { + if (clusterLock != null) { + clusterLock.unLock() + clusterLock.close() + } + } + + /** Get the Kafka Supplier for processor lookup [name] **/ + fun bluePrintProcessorSupplier(name: String): ProcessorSupplier { + return ProcessorSupplier { + // Dynamically resolve the Prioritization Processor + BluePrintDependencyService.instance>(name) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt new file mode 100644 index 000000000..286a9b5c1 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -0,0 +1,350 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import io.mockk.coEvery +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.context.ApplicationContext +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@DataJpaTest +@DirtiesContext +@ContextConfiguration( + classes = [ + BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class + ] +) +@TestPropertySource( + properties = + [ + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" + ] +) +open class MessagePrioritizationConsumerTest { + + private val log = logger(MessagePrioritizationConsumerTest::class) + + @Autowired + lateinit var applicationContext: ApplicationContext + + @Autowired + lateinit var prioritizationMessageRepository: PrioritizationMessageRepository + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Autowired + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService + + @Autowired + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + + @Before + fun setup() { + BluePrintDependencyService.inject(applicationContext) + } + + @Test + fun testBluePrintKafkaJDBCKeyStore() { + runBlocking { + assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") + + val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") + + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { + val message = messagePrioritizationService.saveMessage(it) + val repoResult = messagePrioritizationService.getMessage(message.id) + assertNotNull(repoResult, "failed to get inserted message.") + } + } + } + + @Test + fun testMessagePrioritizationService() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + log.info("**************** without Correlation **************") + /** Checking without correlation */ + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + messagePrioritizationService.prioritize(it) + } + log.info("**************** Same Group , with Correlation **************") + /** checking same group with correlation */ + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + log.info("**************** Different Type , with Correlation **************") + /** checking different type, with correlation */ + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + } + } + + @Test + fun testStartConsuming() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + + val streamingConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector) + assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") + + val spyStreamingConsumerService = spyk(streamingConsumerService) + coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit + coEvery { spyStreamingConsumerService.shutDown() } returns Unit + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, mockk() + ) + val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) + + // Test Topology + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) + assertNotNull(topology, "failed to get create topology") + + every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService + spyMessagePrioritizationConsumer.startConsuming(configuration) + spyMessagePrioritizationConsumer.shutDown() + } + } + + @Test + fun testSchedulerService() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationSchedulerService = + MessagePrioritizationSchedulerService(messagePrioritizationService) + launch { + messagePrioritizationSchedulerService.startScheduling() + } + launch { + /** To debug increase the delay time */ + delay(20) + messagePrioritizationSchedulerService.shutdownScheduling() + } + } + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + // @Test + fun testKafkaMessagePrioritizationConsumer() { + runBlocking { + + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val kafkaMessagePrioritizationService = + SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) + kafkaMessagePrioritizationService.setConfiguration(configuration) + + val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( + messagePrioritizationStateService, + kafkaMessagePrioritizationService + ) + + // Register the processor + BluePrintDependencyService.registerSingleton( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + defaultMessagePrioritizeProcessor + ) + + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, + kafkaMessagePrioritizationService + ) + messagePrioritizationConsumer.startConsuming(configuration) + + /** Send sample message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + } + delay(10000) + messagePrioritizationConsumer.shutDown() + } + } + + /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker + * Start : + * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + * */ + // @Test + fun testNatsMessagePrioritizationConsumer() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") + + val inputSubject = + NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) + + val natsMessagePrioritizationService = + SampleNatsMessagePrioritizationService(messagePrioritizationStateService) + natsMessagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationConsumer = + NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) + messagePrioritizationConsumer.startConsuming() + + /** Send sample message with every 1 sec */ + val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService + + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(200) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + } + delay(3000) + messagePrioritizationConsumer.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt new file mode 100644 index 000000000..22c399608 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Service +import javax.sql.DataSource + +@Configuration +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) +@EnableAutoConfiguration +open class TestDatabaseConfiguration { + + @Bean("primaryDBLibGenericService") + open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { + return PrimaryDBLibGenericService( + NamedParameterJdbcTemplate(dataSource) + ) + } +} + +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ + +@Service +open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + SampleMessagePrioritizationService(messagePrioritizationStateService) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt new file mode 100644 index 000000000..73d3738e5 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -0,0 +1,132 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class MessageCorrelationUtilsTest { + + @Test + fun testCorrelationKeysReordered() { + + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1" + ) + + val multipleMessages: MutableList = arrayListOf() + multipleMessages.add(message1) + multipleMessages.add(message2) + val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) + assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") + } + + @Test + fun differentTypesWithSameCorrelationMessages() { + /** With Types **/ + /* Assumption is Same group with different types */ + val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2") + ) + assertTrue( + differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse" + ) + + /* Assumption is Same group with different types and one missing expected types, + In this case type-3 message is missing */ + val differentTypesWithSameCorrelationMessagesResWithMissingType = + MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3") + ) + assertTrue( + !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" + ) + } + + @Test + fun withSameCorrelationMessagesWithIgnoredTypes() { + /** With ignoring Types */ + /** Assumption is only one message received */ + val withSameCorrelationOneMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationOneMessages, null + ) + assertTrue( + !withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) + + /** Assumption is two message received for same group with same correlation */ + val withSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationMessages, null + ) + assertTrue( + withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) + } + + @Test + fun differentTypesWithDifferentCorrelationMessage() { + /** Assumption is two message received for same group with different expected types and different correlation */ + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3" + ) + val differentTypesWithDifferentCorrelationMessage: MutableList = arrayListOf() + differentTypesWithDifferentCorrelationMessage.add(message1) + differentTypesWithDifferentCorrelationMessage.add(message2) + val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1") + ) + assertTrue( + !differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp" + ) + } + + @Test + fun testPrioritizationOrdering() { + val differentPriorityMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5) + val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority() + assertNotNull(orderedPriorityMessages, "failed to order the priority messages") + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e3a1f7a01 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + ${localPattern} + + + + + + + + + + + + + diff --git a/ms/blueprintsprocessor/functions/pom.xml b/ms/blueprintsprocessor/functions/pom.xml index ed3b5c98a..5a59939f0 100755 --- a/ms/blueprintsprocessor/functions/pom.xml +++ b/ms/blueprintsprocessor/functions/pom.xml @@ -40,7 +40,7 @@ restconf-executor cli-executor config-snapshots - message-prioritizaion + message-prioritization k8s-profile-upload diff --git a/ms/blueprintsprocessor/modules/inbounds/configs-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/configs-api/pom.xml index f135e4263..72a16ea53 100644 --- a/ms/blueprintsprocessor/modules/inbounds/configs-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/configs-api/pom.xml @@ -28,7 +28,7 @@ configs-api jar - MS Blueprints Processor Modules - Inbound - Configurations API + MS Blueprints Processor Modules - Inbounds - Configurations API diff --git a/ms/blueprintsprocessor/modules/inbounds/designer-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/designer-api/pom.xml index 8e17b0934..1c7e186dd 100644 --- a/ms/blueprintsprocessor/modules/inbounds/designer-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/designer-api/pom.xml @@ -28,7 +28,7 @@ designer-api jar - MS Blueprints Processor Modules - Inbound - Designer API + MS Blueprints Processor Modules - Inbounds - Designer API diff --git a/ms/blueprintsprocessor/modules/inbounds/health-api-common/pom.xml b/ms/blueprintsprocessor/modules/inbounds/health-api-common/pom.xml index 2bfcbe38a..ab143ee91 100644 --- a/ms/blueprintsprocessor/modules/inbounds/health-api-common/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/health-api-common/pom.xml @@ -29,7 +29,7 @@ health-api-common jar - MS Blueprints Processor Modules - Inbound - Health API common + MS Blueprints Processor Modules - Inbounds - Health API common diff --git a/ms/blueprintsprocessor/modules/inbounds/health-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/health-api/pom.xml index 99a5529ce..2f217c160 100644 --- a/ms/blueprintsprocessor/modules/inbounds/health-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/health-api/pom.xml @@ -29,7 +29,7 @@ health-api jar - MS Blueprints Processor Modules - Inbound - Health API + MS Blueprints Processor Modules - Inbounds - Health API diff --git a/ms/blueprintsprocessor/modules/inbounds/resource-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/resource-api/pom.xml index 54d0acdd4..acf3a3133 100644 --- a/ms/blueprintsprocessor/modules/inbounds/resource-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/resource-api/pom.xml @@ -28,7 +28,7 @@ resource-api jar - MS Blueprints Processor Modules - Inbound - Resource API + MS Blueprints Processor Modules - Inbounds - Resource API diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml index 706dd458a..c120820f1 100755 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml @@ -31,7 +31,7 @@ selfservice-api jar - MS Blueprints Processor Modules - Inbound - Self Service API + MS Blueprints Processor Modules - Inbounds - Self Service API diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml index 8e407a3e5..645e0cb0c 100755 --- a/ms/blueprintsprocessor/parent/pom.xml +++ b/ms/blueprintsprocessor/parent/pom.xml @@ -442,7 +442,7 @@ org.onap.ccsdk.cds.blueprintsprocessor.functions - message-prioritizaion + message-prioritization ${ccsdk.cds.version} -- cgit 1.2.3-korg