From 5844724ca96d08c3b752effdb10fd2586755912d Mon Sep 17 00:00:00 2001 From: "Singal, Kapil (ks220y)" Date: Tue, 15 Dec 2020 19:02:17 -0500 Subject: Fixing typo in message-prioritization Refactoring few POMs name tag Issue-ID: CCSDK-3053 Signed-off-by: Singal, Kapil (ks220y) Change-Id: I14447ea7f93efcc970213bbe7d42663cb87e33d7 --- .../functions/message-prioritization/README.md | 26 ++ .../functions/message-prioritization/pom.xml | 43 +++ .../MessagePrioritizationConfiguration.kt | 33 ++ .../prioritization/MessagePrioritizationData.kt | 89 ++++++ .../prioritization/MessagePrioritizationService.kt | 37 +++ .../MessagePrioritizationStateService.kt | 72 +++++ .../prioritization/MessagePrioritizeExtensions.kt | 67 ++++ .../prioritization/api/MessagePrioritizationApi.kt | 80 +++++ .../prioritization/db/MessagePrioritization.kt | 89 ++++++ .../db/PrioritizationMessageRepository.kt | 175 +++++++++++ .../AbstractKafkaMessagePrioritizationService.kt | 84 +++++ .../kafka/AbstractMessagePrioritizeProcessor.kt | 28 ++ .../kafka/DefaultMessagePrioritizeProcessor.kt | 78 +++++ .../kafka/KafkaMessagePrioritizationConsumer.kt | 108 +++++++ .../kafka/MessagePrioritizationSerde.kt | 64 ++++ .../AbstractNatsMessagePrioritizationService.kt | 85 +++++ .../nats/NatsMessagePrioritizationConsumer.kt | 92 ++++++ .../AbstractMessagePrioritizationService.kt | 203 ++++++++++++ .../MessagePrioritizationSchedulerService.kt | 98 ++++++ .../MessagePrioritizationStateServiceImpl.kt | 176 +++++++++++ .../service/SampleMessagePrioritizationService.kt | 120 +++++++ .../utils/MessageCorrelationUtils.kt | 82 +++++ .../utils/MessagePrioritizationSample.kt | 148 +++++++++ .../prioritization/utils/MessageProcessorUtils.kt | 86 +++++ .../MessagePrioritizationConsumerTest.kt | 350 +++++++++++++++++++++ .../message/prioritization/TestConfiguration.kt | 65 ++++ .../utils/MessageCorrelationUtilsTest.kt | 132 ++++++++ .../src/test/resources/logback-test.xml | 42 +++ 28 files changed, 2752 insertions(+) create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/README.md create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/pom.xml create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml (limited to 'ms/blueprintsprocessor/functions/message-prioritization') diff --git a/ms/blueprintsprocessor/functions/message-prioritization/README.md b/ms/blueprintsprocessor/functions/message-prioritization/README.md new file mode 100644 index 000000000..cda43faca --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/README.md @@ -0,0 +1,26 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + +To publish message +-------------------- +kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning diff --git a/ms/blueprintsprocessor/functions/message-prioritization/pom.xml b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml new file mode 100644 index 000000000..9b6f3b1c3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/pom.xml @@ -0,0 +1,43 @@ + + + + + 4.0.0 + + + org.onap.ccsdk.cds.blueprintsprocessor + blueprintsprocessor-functions + 1.1.0-SNAPSHOT + + + org.onap.ccsdk.cds.blueprintsprocessor.functions + message-prioritization + + MS Blueprints Processor Functions - Message Prioritization + Blueprints Processor Function - Message Prioritization + + + + org.onap.ccsdk.cds.blueprintsprocessor.modules + message-lib + + + com.h2database + h2 + + + diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt new file mode 100644 index 000000000..890e0a6ba --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -0,0 +1,33 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +open class MessagePrioritizationConfiguration + +object MessagePrioritizationConstants { + + const val SOURCE_INPUT = "source-prioritization-input" + + const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" + + const val SINK_OUTPUT = "sink-prioritization-output" +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt new file mode 100644 index 000000000..65b7644a8 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -0,0 +1,89 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import java.io.Serializable + +object MessageActionConstants { + + const val PRIORITIZE = "prioritize" +} + +enum class MessageState(val id: String) { + NEW("new"), + WAIT("wait"), + EXPIRED("expired"), + PRIORITIZED("prioritized"), + AGGREGATED("aggregated"), + COMPLETED("completed"), + ERROR("error") +} + +open class PrioritizationConfiguration : Serializable { + + lateinit var expiryConfiguration: ExpiryConfiguration + lateinit var shutDownConfiguration: ShutDownConfiguration + lateinit var cleanConfiguration: CleanConfiguration + var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration + var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration +} + +open class KafkaConfiguration : Serializable { + + lateinit var inputTopicSelector: String // Consumer Configuration Selector + lateinit var expiredTopic: String // Publish Configuration Selector + lateinit var outputTopic: String // Publish Configuration Selector +} + +open class NatsConfiguration : Serializable { + + lateinit var connectionSelector: String // Consumer Configuration Selector + lateinit var inputSubject: String // Publish Configuration Selector + lateinit var expiredSubject: String // Publish Configuration Selector + lateinit var outputSubject: String // Publish Configuration Selector +} + +open class ExpiryConfiguration : Serializable { + + var frequencyMilli: Long = 30000L + var maxPollRecord: Int = 1000 +} + +open class ShutDownConfiguration : Serializable { + + var waitMill: Long = 30000L +} + +open class CleanConfiguration : Serializable { + + var frequencyMilli: Long = 30000L + var expiredRecordsHoldDays: Int = 5 +} + +open class UpdateStateRequest : Serializable { + + lateinit var id: String + var group: String? = null + var state: String? = null +} + +data class CorrelationCheckResponse( + var message: String? = null, + var correlated: Boolean = false +) + +data class TypeCorrelationKey(val type: String, val correlationId: String) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt new file mode 100644 index 000000000..dfe516953 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization + +interface MessagePrioritizationService { + + fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) + + fun getConfiguration(): PrioritizationConfiguration + + suspend fun prioritize(messagePrioritization: MessagePrioritization) + + /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ + suspend fun output(messages: List) + + /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ + suspend fun updateExpiredMessages() + + /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ + suspend fun cleanExpiredMessage() +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..2e5e6c617 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt @@ -0,0 +1,72 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.Date + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getMessages(ids: List): List? + + suspend fun getExpiryEligibleMessages(count: Int): List? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? + + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? + + suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? + + suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? + + suspend fun updateMessagesState(ids: List, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesPriority(ids: List, priority: String) + + suspend fun setMessagesState(ids: List, state: String) + + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessages(id: List) + + suspend fun deleteExpiredMessage(retentionDays: Int) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt new file mode 100644 index 000000000..d8e71d413 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -0,0 +1,67 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** + * Register the MessagePrioritizationStateService and exposed dependency + */ +fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = + instance(MessagePrioritizationStateService::class) + +/** + * Expose messagePrioritizationStateService to AbstractComponentFunction + */ +fun AbstractComponentFunction.messagePrioritizationStateService() = + BluePrintDependencyService.messagePrioritizationStateService() + +/** + * MessagePrioritization correlation extensions + */ + +/** + * Arrange comma separated correlation keys in ascending order. + */ +fun MessagePrioritization.toFormatedCorrelation(): String { + return this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") +} + +/** + * Used to group the correlation with respect to types. + */ +fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { + return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) +} + +/** get list of message ids **/ +fun List.ids(): List { + return this.map { it.id } +} + +/** Ordered by highest priority and updated date **/ +fun List.orderByHighestPriority(): List { + return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate)) +} + +/** Ordered by Updated date **/ +fun List.orderByUpdatedDate(): List { + return this.sortedWith(compareBy(MessagePrioritization::updatedDate)) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt new file mode 100644 index 000000000..c7aab03b6 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -0,0 +1,80 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope +import org.springframework.http.MediaType +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.ResponseBody +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping(value = ["/api/v1/message-prioritization"]) +open class MessagePrioritizationApi( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) { + + @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + suspend fun ping(): String = mdcWebCoroutineScope { "Success" } + + @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) + @ResponseBody + suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope { + messagePrioritizationStateService.getMessage(id) + } + + @PostMapping( + path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = + mdcWebCoroutineScope { + messagePrioritizationStateService.saveMessage(messagePrioritization) + } + + @PostMapping( + path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope { + messagePrioritizationService.prioritize(messagePrioritization) + } + + @PostMapping( + path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = + mdcWebCoroutineScope { + messagePrioritizationStateService.setMessageState( + updateMessageState.id, + updateMessageState.state!! + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt new file mode 100644 index 000000000..ce2085f68 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt @@ -0,0 +1,89 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import com.fasterxml.jackson.annotation.JsonFormat +import org.hibernate.annotations.Proxy +import org.springframework.data.annotation.LastModifiedDate +import org.springframework.data.jpa.domain.support.AuditingEntityListener +import org.springframework.data.jpa.repository.config.EnableJpaAuditing +import java.util.Date +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.EntityListeners +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.Table +import javax.persistence.Temporal +import javax.persistence.TemporalType + +@EnableJpaAuditing +@EntityListeners(AuditingEntityListener::class) +@Entity +@Table(name = "MESSAGE_PRIORITIZATION") +@Proxy(lazy = false) +open class MessagePrioritization { + + @Id + @Column(name = "message_id", length = 50) + lateinit var id: String + + @Column(name = "message_group", length = 50, nullable = false) + lateinit var group: String + + @Column(name = "message_type", length = 50, nullable = false) + lateinit var type: String + + /** States Defined by MessageState */ + @Column(name = "message_state", length = 20, nullable = false) + lateinit var state: String + + @Column(name = "priority", nullable = false) + var priority: Int = 5 + + @Lob + @Column(name = "message", nullable = false) + var message: String? = null + + @Lob + @Column(name = "error", nullable = true) + var error: String? = null + + @Lob + @Column(name = "aggregated_message_ids", nullable = true) + var aggregatedMessageIds: String? = null + + @Lob + @Column(name = "correlation_id", nullable = true) + var correlationId: String? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "created_date", nullable = false) + var createdDate = Date() + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @LastModifiedDate + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updated_date", nullable = false) + var updatedDate: Date? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "expiry_date", nullable = false) + var expiryDate: Date? = null +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt new file mode 100644 index 000000000..0b35e3856 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt @@ -0,0 +1,175 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc" + ) + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndNotExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByStateInAndExpiredDate( + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndTypesAndCorrelationId( + group: String, + states: List, + types: List, + correlationId: String + ): List? + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateForMessageId(id: String, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setPriorityForMessageIds(ids: List, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" + ) + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE id IN :ids") + fun deleteByIds(ids: List) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ") + fun deleteByExpiryDate(expiryCheckDate: Date) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states") + fun deleteGroupAndStateIn(group: String, states: List) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt new file mode 100644 index 000000000..112a80379 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt @@ -0,0 +1,84 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractKafkaMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractKafkaMessagePrioritizationService::class) + + lateinit var processorContext: ProcessorContext + + fun setKafkaProcessorContext(processorContext: ProcessorContext) { + this.processorContext = processorContext + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + processorContext.forward( + updatedMessage.id, + updatedMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } + check(::processorContext.isInitialized) { "failed to initialize kafka processor " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + processorContext.forward( + expiredMessage.id, expiredMessage, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..d4f8470c8 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor + +/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { + + override fun init(processorContext: ProcessorContext) { + this.processorContext = processorContext + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..1b0612492 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -0,0 +1,78 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils + +open class DefaultMessagePrioritizeProcessor( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) : AbstractMessagePrioritizeProcessor() { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + + val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + try { + kafkaMessagePrioritizationService.prioritize(messagePrioritize) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + /** Publish to Output topic */ + this.processorContext.forward( + messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** Set Configuration and Processor Context to messagePrioritizationService */ + if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { + kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) + } else { + throw BluePrintProcessorException( + "messagePrioritizationService is not instance of " + + "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" + ) + } + } + + override fun close() { + log.info( + "closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..4ab399f54 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt @@ -0,0 +1,108 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class KafkaMessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val kafkaMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(KafkaMessagePrioritizationConsumer::class) + + private lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + kafkaConsumerConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + + val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration + ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") + + streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (::streamingConsumerService.isInitialized) { + streamingConsumerService.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt new file mode 100644 index 000000000..5595863d4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt new file mode 100644 index 000000000..502a7822d --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt @@ -0,0 +1,85 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +abstract class AbstractNatsMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(AbstractNatsMessagePrioritizationService::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject + messages.forEach { message -> + val updatedMessage = + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + updatedMessage.asJsonType().asByteArray() + ) + } + } + + override suspend fun updateExpiredMessages() { + checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } + check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expiredMessage -> + expiredMessage.state = MessageState.EXPIRED.name + /** send to the output subject */ + bluePrintNatsService.publish( + NatsClusterUtils.currentApplicationSubject(outputSubject), + expiredMessage.asJsonType().asByteArray() + ) + } + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt new file mode 100644 index 000000000..a0b2cf462 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt @@ -0,0 +1,92 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats + +import io.nats.streaming.MessageHandler +import io.nats.streaming.Subscription +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils + +open class NatsMessagePrioritizationConsumer( + private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, + private val natsMessagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(NatsMessagePrioritizationConsumer::class) + + lateinit var bluePrintNatsService: BluePrintNatsService + private lateinit var subscription: Subscription + + suspend fun startConsuming() { + val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() + val natsConfiguration = prioritizationConfiguration.natsConfiguration + ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") + + check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { + "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." + } + bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) + natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService + val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) + val loadBalanceGroup = ClusterUtils.applicationName() + val messageHandler = createMessageHandler() + val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) + subscription = bluePrintNatsService.loadBalanceSubscribe( + inputSubject, + loadBalanceGroup, + messageHandler, + subscriptionOptions + ) + log.info( + "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." + ) + } + + suspend fun shutDown() { + if (::subscription.isInitialized) { + subscription.unsubscribe() + } + log.info("Nats prioritization consumer listener shutdown complete") + } + + private fun consumerService(selector: String): BluePrintNatsService { + return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) + } + + private fun createMessageHandler(): MessageHandler { + return MessageHandler { message -> + try { + val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) + runBlocking { + natsMessagePrioritizationService.prioritize(messagePrioritization) + } + } catch (e: Exception) { + log.error("failed to process prioritize message", e) + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt new file mode 100644 index 000000000..f4602a810 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -0,0 +1,203 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ +abstract class AbstractMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : MessagePrioritizationService { + + private val log = logger(AbstractMessagePrioritizationService::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + + override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { + this.prioritizationConfiguration = prioritizationConfiguration + } + + override fun getConfiguration(): PrioritizationConfiguration { + return this.prioritizationConfiguration + } + + override suspend fun prioritize(messagePrioritize: MessagePrioritization) { + try { + log.info("***** received in prioritize processor key(${messagePrioritize.id})") + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + /** Get the cluster lock for message group */ + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) + // Save the Message + messagePrioritizationStateService.saveMessage(messagePrioritize) + handleCorrelationAndNextStep(messagePrioritize) + /** Cluster unLock for message group */ + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + } + } + + override suspend fun output(messages: List) { + log.info("$$$$$ received in output processor id(${messages.ids()})") + messages.forEach { message -> + messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) + } + } + + override suspend fun updateExpiredMessages() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() + try { + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + val expiredIds = fetchMessages?.ids() + if (!expiredIds.isNullOrEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + } + } catch (e: Exception) { + log.error("failed in updating expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } + + override suspend fun cleanExpiredMessage() { + check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } + + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + val clusterLock = MessageProcessorUtils.prioritizationCleanLock() + try { + messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) + } catch (e: Exception) { + log.error("failed in clean expired messages", e) + } finally { + MessageProcessorUtils.prioritizationUnLock(clusterLock) + } + } + + open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { + /** Check correlation enabled and correlation field has populated */ + if (!messagePrioritization.correlationId.isNullOrBlank()) { + val id = messagePrioritization.id + val group = messagePrioritization.group + val correlationId = messagePrioritization.correlationId!! + val types = getGroupCorrelationTypes(messagePrioritization) + log.info( + "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " + + "correlation types($types), priority(${messagePrioritization.priority}), " + + "correlation id($correlationId)" + ) + + /** Get all previously received messages from database for group and optional types and correlation Id */ + val waitingCorrelatedStoreMessages = messagePrioritizationStateService + .getCorrelatedMessages( + group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId + ) + + /** If multiple records found, then check correlation */ + if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { + /** Check all correlation satisfies */ + val correlationResults = MessageCorrelationUtils + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + + if (correlationResults.correlated) { + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.PRIORITIZED.name + ) + /** Correlation satisfied, Send only correlated messages to aggregate processor */ + aggregate(waitingCorrelatedStoreMessages) + } else { + /** Correlation not satisfied */ + log.trace("correlation not matched : ${correlationResults.message}") + // Update the Message state to Wait + messagePrioritizationStateService.setMessagesState( + waitingCorrelatedStoreMessages.ids(), + MessageState.WAIT.name + ) + } + } else { + /** received first message of group and correlation Id, update the message with wait state */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) + } + } else { + /** No Correlation check needed, simply forward to next processor. */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) + aggregate(arrayListOf(messagePrioritization)) + } + } + + open suspend fun aggregate(messages: List) { + log.info("@@@@@ received in aggregation processor ids(${messages.ids()}") + if (!messages.isNullOrEmpty()) { + try { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(messages) + } catch (e: Exception) { + val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" + if (!messages.isNullOrEmpty()) { + messages.forEach { messagePrioritization -> + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) + } catch (sendException: Exception) { + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", + e + ) + } + } + /** Publish to output topic */ + output(messages) + } + } + } + } + + /** Child will override this implementation , if necessary + * Here the place child has to implement custom Sequencing and Aggregation logic. + * */ + abstract suspend fun handleAggregation(messages: List) + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt new file mode 100644 index 000000000..529d773a4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.stereotype.Service + +@Service +open class MessagePrioritizationSchedulerService( + private val messagePrioritizationService: MessagePrioritizationService +) { + + private val log = logger(MessagePrioritizationSchedulerService::class) + + @Volatile + var keepGoing = true + + /** This is sample scheduler implementation used during starting application with configuration. + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + */ + + open suspend fun startScheduling() { + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + + log.info("Starting Prioritization Scheduler Service...") + GlobalScope.launch { + expiryScheduler(prioritizationConfiguration) + } + GlobalScope.launch { + cleanUpScheduler(prioritizationConfiguration) + } + } + + open suspend fun shutdownScheduling() { + keepGoing = false + val prioritizationConfiguration = messagePrioritizationService.getConfiguration() + delay(prioritizationConfiguration.shutDownConfiguration.waitMill) + } + + private suspend fun expiryScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.updateExpiredMessages() + delay(expiryConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization expiry scheduler", e) + } + } + } + } + + private suspend fun cleanUpScheduler( + prioritizationConfiguration: PrioritizationConfiguration + ) { + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec") + withContext(Dispatchers.Default) { + while (keepGoing) { + try { + messagePrioritizationService.cleanExpiredMessage() + delay(cleanConfiguration.frequencyMilli) + } catch (e: Exception) { + log.error("failed in prioritization clean scheduler", e) + } + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt new file mode 100644 index 000000000..ed16fd44f --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -0,0 +1,176 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate +import org.springframework.data.domain.PageRequest +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Service +open class MessagePrioritizationStateServiceImpl( + private val prioritizationMessageRepository: PrioritizationMessageRepository +) : MessagePrioritizationStateService { + + private val log = logger(MessagePrioritizationStateServiceImpl::class) + + @Transactional + override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { + if (!message.correlationId.isNullOrBlank()) { + message.correlationId = message.toFormatedCorrelation() + } + message.updatedDate = Date() + return prioritizationMessageRepository.save(message) + } + + override suspend fun getMessage(id: String): MessagePrioritization { + return prioritizationMessageRepository.findById(id).orElseGet(null) + ?: throw BluePrintProcessorException("couldn't find message for id($id)") + } + + override suspend fun getMessages(ids: List): List? { + return prioritizationMessageRepository.findAllById(ids) + } + + override suspend fun getExpiryEligibleMessages(count: Int): List? { + return prioritizationMessageRepository + .findByStateInAndExpiredDate( + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List? { + return prioritizationMessageRepository.findByExpiredDate( + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? { + return if (!types.isNullOrEmpty()) { + prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) + } else { + prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) + } + } + + @Transactional + override suspend fun updateMessagesState(ids: List, state: String) { + ids.forEach { + val updated = updateMessageState(it, state) + log.info("message($it) update to state(${updated.state})") + } + } + + @Transactional + override suspend fun setMessageState(id: String, state: String) { + prioritizationMessageRepository.setStateForMessageId(id, state, Date()) + } + + @Transactional + override suspend fun setMessagesPriority(ids: List, priority: String) { + prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) + } + + @Transactional + override suspend fun setMessagesState(ids: List, state: String) { + prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) + } + + @Transactional + override suspend fun setMessageStateANdError(id: String, state: String, error: String) { + prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) + } + + @Transactional + override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + } + return saveMessage(updateMessage) + } + + @Transactional + override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { + val groupedIds = aggregatedIds.joinToString(",") + prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) + } + + override suspend fun deleteMessage(id: String) { + prioritizationMessageRepository.deleteById(id) + log.info("Prioritization Messages $id deleted successfully.") + } + + override suspend fun deleteMessages(ids: List) { + prioritizationMessageRepository.deleteByIds(ids) + log.info("Prioritization Messages $ids deleted successfully.") + } + + override suspend fun deleteExpiredMessage(retentionDays: Int) { + val expiryCheckDate = controllerDate().addDate(retentionDays) + prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate) + } + + override suspend fun deleteMessageByGroup(group: String) { + prioritizationMessageRepository.deleteGroup(group) + log.info("Prioritization Messages group($group) deleted successfully.") + } + + override suspend fun deleteMessageStates(group: String, states: List) { + prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + log.info("Prioritization Messages group($group) with states($states) deleted successfully.") + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt new file mode 100644 index 000000000..305e64ba4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -0,0 +1,120 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** Sample Prioritization Service, Define spring service injector to register in application*/ +open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messages: List) { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + sampleMessagePrioritizationHandler.handleAggregation(messages) + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( + this, messagePrioritizationStateService + ) + return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) + } +} + +class SampleMessagePrioritizationHandler( + private val messagePrioritizationService: MessagePrioritizationService, + private val messagePrioritizationStateService: MessagePrioritizationStateService +) { + + private val log = logger(SampleMessagePrioritizationHandler::class) + + suspend fun handleAggregation(messages: List) { + log.info("messages(${messages.ids()}) aggregated") + /** Sequence based on Priority and Updated Date */ + val sequencedMessage = messages.orderByHighestPriority() + /** Update all messages to aggregated state */ + messagePrioritizationStateService.setMessagesState( + sequencedMessage.ids(), + MessageState.AGGREGATED.name + ) + messagePrioritizationService.output(sequencedMessage) + } + + fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + return when (messagePrioritization.group) { + /** Dummy Implementation, This can also be read from file and stored as cached map **/ + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + "pass-typed" -> arrayListOf(messagePrioritization.type) + else -> null + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt new file mode 100644 index 000000000..7ab0be098 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +object MessageCorrelationUtils { + + /** Assumption is message is of same group **/ + fun correlatedMessages(collectedMessages: List): CorrelationCheckResponse { + val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") + if (collectedMessages.size > 1) { + val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } + if (filteredMessage.isNotEmpty()) { + val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } + if (groupedMessage.size == 1) { + correlationCheckResponse.correlated = true + correlationCheckResponse.message = null + } + } + } else { + correlationCheckResponse.message = "received only one message for that group" + } + return correlationCheckResponse + } + + /** Assumption is message is of same group and checking for required types **/ + fun correlatedMessagesWithTypes(collectedMessages: List, types: List?): + CorrelationCheckResponse { + + return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { + + val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } + if (!unknownMessageTypes.isNullOrEmpty()) { + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + } + + val copyTypes = types.toTypedArray().copyOf().toMutableList() + + val filteredMessage = collectedMessages.filter { + !it.correlationId.isNullOrBlank() && + types.contains(it.type) + } + var correlatedKeys: MutableSet = mutableSetOf() + if (filteredMessage.isNotEmpty()) { + val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } + val foundType = correlatedMap.keys.map { it.type } + copyTypes.removeAll(foundType) + correlatedKeys = correlatedMap.keys.map { + it.correlationId + }.toMutableSet() + } + /** Check if any Types missing and same correlation id for all types */ + return if (copyTypes.isEmpty()) { + if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) + else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } else { + CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + } + } else { + return correlatedMessages(collectedMessages) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt new file mode 100644 index 000000000..2c4ae30da --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -0,0 +1,148 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate +import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate +import java.util.Date +import java.util.UUID + +object MessagePrioritizationSample { + + fun samplePrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + kafkaConfiguration = KafkaConfiguration().apply { + inputTopicSelector = "prioritize-input" + outputTopic = "prioritize-output-topic" + expiredTopic = "prioritize-expired-topic" + } + natsConfiguration = NatsConfiguration().apply { + connectionSelector = "cds-controller" + inputSubject = "prioritize-input" + outputSubject = "prioritize-output" + expiredSubject = "prioritize-expired" + } + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10000L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 2000L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10000L + expiredRecordsHoldDays = 5 + } + } + } + + fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 20L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10L + expiredRecordsHoldDays = 5 + } + } + } + + private fun currentDatePlusDays(days: Int): Date { + return controllerDate().addDate(days) + } + + fun sampleMessages(messageState: String, count: Int): List { + return sampleMessages("sample-group", messageState, count) + } + + fun sampleMessages(groupName: String, messageState: String, count: Int): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, + "sample-type", null + ) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, "sample-type", + "key1=value1,key2=value2" + ) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithDifferentTypeSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage( + groupName, messageState, "type-$it", + "key1=value1,key2=value2" + ) + messages.add(backPressureMessage) + } + return messages + } + + fun createMessage( + groupName: String, + messageState: String, + messageType: String, + messageCorrelationId: String? + ): MessagePrioritization { + + return MessagePrioritization().apply { + id = UUID.randomUUID().toString() + group = groupName + type = messageType + state = messageState + priority = (1..10).shuffled().first() + correlationId = messageCorrelationId + message = "I am the Message" + createdDate = Date() + updatedDate = Date() + expiryDate = currentDatePlusDays(3) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt new file mode 100644 index 000000000..86cec3697 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -0,0 +1,86 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +object MessageProcessorUtils { + + /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ + suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + + return if (clusterService != null && clusterService.clusterJoined() && + !messagePrioritization.correlationId.isNullOrBlank() + ) { + // Get the correlation key in ascending order, even it it is misplaced + val correlationId = messagePrioritization.toFormatedCorrelation() + val lockName = "prioritize::${messagePrioritization.group}::$correlationId" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationExpiryLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-expiry" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility to create the cluster lock for expiry scheduler*/ + suspend fun prioritizationCleanLock(): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined()) { + val lockName = "prioritize-clean" + val clusterLock = clusterService.clusterLock(lockName) + clusterLock.lock() + if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") + clusterLock + } else null + } + + /** Utility used to cluster unlock for message [clusterLock] */ + suspend fun prioritizationUnLock(clusterLock: ClusterLock?) { + if (clusterLock != null) { + clusterLock.unLock() + clusterLock.close() + } + } + + /** Get the Kafka Supplier for processor lookup [name] **/ + fun bluePrintProcessorSupplier(name: String): ProcessorSupplier { + return ProcessorSupplier { + // Dynamically resolve the Prioritization Processor + BluePrintDependencyService.instance>(name) + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt new file mode 100644 index 000000000..286a9b5c1 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -0,0 +1,350 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import io.mockk.coEvery +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils +import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.context.ApplicationContext +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull + +@RunWith(SpringRunner::class) +@DataJpaTest +@DirtiesContext +@ContextConfiguration( + classes = [ + BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class + ] +) +@TestPropertySource( + properties = + [ + "spring.jpa.show-sql=false", + "spring.jpa.properties.hibernate.show_sql=false", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", + "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", + "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", + "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", + "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", + + "blueprintsprocessor.nats.cds-controller.type=token-auth", + "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", + "blueprintsprocessor.nats.cds-controller.token=tokenAuth" + ] +) +open class MessagePrioritizationConsumerTest { + + private val log = logger(MessagePrioritizationConsumerTest::class) + + @Autowired + lateinit var applicationContext: ApplicationContext + + @Autowired + lateinit var prioritizationMessageRepository: PrioritizationMessageRepository + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Autowired + lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService + + @Autowired + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + + @Before + fun setup() { + BluePrintDependencyService.inject(applicationContext) + } + + @Test + fun testBluePrintKafkaJDBCKeyStore() { + runBlocking { + assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") + + val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") + + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { + val message = messagePrioritizationService.saveMessage(it) + val repoResult = messagePrioritizationService.getMessage(message.id) + assertNotNull(repoResult, "failed to get inserted message.") + } + } + } + + @Test + fun testMessagePrioritizationService() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + log.info("**************** without Correlation **************") + /** Checking without correlation */ + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + messagePrioritizationService.prioritize(it) + } + log.info("**************** Same Group , with Correlation **************") + /** checking same group with correlation */ + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + log.info("**************** Different Type , with Correlation **************") + /** checking different type, with correlation */ + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + } + } + + @Test + fun testStartConsuming() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + + val streamingConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector) + assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") + + val spyStreamingConsumerService = spyk(streamingConsumerService) + coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit + coEvery { spyStreamingConsumerService.shutDown() } returns Unit + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, mockk() + ) + val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) + + // Test Topology + val kafkaStreamConsumerFunction = + spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) + assertNotNull(topology, "failed to get create topology") + + every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService + spyMessagePrioritizationConsumer.startConsuming(configuration) + spyMessagePrioritizationConsumer.shutDown() + } + } + + @Test + fun testSchedulerService() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val messagePrioritizationService = + SampleMessagePrioritizationService(messagePrioritizationStateService) + messagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationSchedulerService = + MessagePrioritizationSchedulerService(messagePrioritizationService) + launch { + messagePrioritizationSchedulerService.startScheduling() + } + launch { + /** To debug increase the delay time */ + delay(20) + messagePrioritizationSchedulerService.shutdownScheduling() + } + } + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + // @Test + fun testKafkaMessagePrioritizationConsumer() { + runBlocking { + + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + val kafkaMessagePrioritizationService = + SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) + kafkaMessagePrioritizationService.setConfiguration(configuration) + + val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( + messagePrioritizationStateService, + kafkaMessagePrioritizationService + ) + + // Register the processor + BluePrintDependencyService.registerSingleton( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + defaultMessagePrioritizeProcessor + ) + + val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( + bluePrintMessageLibPropertyService, + kafkaMessagePrioritizationService + ) + messagePrioritizationConsumer.startConsuming(configuration) + + /** Send sample message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + key = "mykey", + message = it.asJsonString(false), + headers = headers + ) + } + } + delay(10000) + messagePrioritizationConsumer.shutDown() + } + } + + /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker + * Start : + * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V + * */ + // @Test + fun testNatsMessagePrioritizationConsumer() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") + + val inputSubject = + NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) + + val natsMessagePrioritizationService = + SampleNatsMessagePrioritizationService(messagePrioritizationStateService) + natsMessagePrioritizationService.setConfiguration(configuration) + + val messagePrioritizationConsumer = + NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) + messagePrioritizationConsumer.startConsuming() + + /** Send sample message with every 1 sec */ + val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService + + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(200) + bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) + } + } + delay(3000) + messagePrioritizationConsumer.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt new file mode 100644 index 000000000..22c399608 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Service +import javax.sql.DataSource + +@Configuration +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) +@EnableAutoConfiguration +open class TestDatabaseConfiguration { + + @Bean("primaryDBLibGenericService") + open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { + return PrimaryDBLibGenericService( + NamedParameterJdbcTemplate(dataSource) + ) + } +} + +/* Sample Prioritization Listener, used during Application startup +@Component +open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { + + private val log = logger(SamplePrioritizationListeners::class) + + @EventListener(ApplicationReadyEvent::class) + open fun init() = runBlocking { + log.info("Starting PrioritizationListeners...") + defaultMessagePrioritizationConsumer + .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + } + + @PreDestroy + open fun destroy() = runBlocking { + log.info("Shutting down PrioritizationListeners...") + defaultMessagePrioritizationConsumer.shutDown() + } +} + */ + +@Service +open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + SampleMessagePrioritizationService(messagePrioritizationStateService) diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt new file mode 100644 index 000000000..73d3738e5 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -0,0 +1,132 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class MessageCorrelationUtilsTest { + + @Test + fun testCorrelationKeysReordered() { + + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1" + ) + + val multipleMessages: MutableList = arrayListOf() + multipleMessages.add(message1) + multipleMessages.add(message2) + val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) + assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") + } + + @Test + fun differentTypesWithSameCorrelationMessages() { + /** With Types **/ + /* Assumption is Same group with different types */ + val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2") + ) + assertTrue( + differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse" + ) + + /* Assumption is Same group with different types and one missing expected types, + In this case type-3 message is missing */ + val differentTypesWithSameCorrelationMessagesResWithMissingType = + MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3") + ) + assertTrue( + !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" + ) + } + + @Test + fun withSameCorrelationMessagesWithIgnoredTypes() { + /** With ignoring Types */ + /** Assumption is only one message received */ + val withSameCorrelationOneMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationOneMessages, null + ) + assertTrue( + !withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) + + /** Assumption is two message received for same group with same correlation */ + val withSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationMessages, null + ) + assertTrue( + withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) + } + + @Test + fun differentTypesWithDifferentCorrelationMessage() { + /** Assumption is two message received for same group with different expected types and different correlation */ + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3" + ) + val differentTypesWithDifferentCorrelationMessage: MutableList = arrayListOf() + differentTypesWithDifferentCorrelationMessage.add(message1) + differentTypesWithDifferentCorrelationMessage.add(message2) + val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1") + ) + assertTrue( + !differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp" + ) + } + + @Test + fun testPrioritizationOrdering() { + val differentPriorityMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5) + val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority() + assertNotNull(orderedPriorityMessages, "failed to order the priority messages") + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e3a1f7a01 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/resources/logback-test.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + ${localPattern} + + + + + + + + + + + + + -- cgit 1.2.3-korg