From ada372a56d25b41e5a0926a18bf911d20102810f Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 11 Nov 2019 19:35:39 -0500 Subject: Add message prioritization module Kafka streams based solution for message prioritization using database store. Implement initial Abstract Processors, Puntuations and sample Topology for easy plug and play based on situations Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: I9c135604574cc3c642186545e076d6a7c60048d4 --- .../functions/message-prioritizaion/README.txt | 28 ++++ .../functions/message-prioritizaion/pom.xml | 43 +++++ .../prioritization/AbstractTopologyComponents.kt | 41 +++++ .../MessagePrioritizationConfiguration.kt | 37 +++++ .../MessagePrioritizationConsumer.kt | 101 ++++++++++++ .../prioritization/MessagePrioritizationData.kt | 69 ++++++++ .../prioritization/MessagePrioritizeExtensions.kt | 44 ++++++ .../db/MessagePrioritizationRepositories.kt | 98 ++++++++++++ .../db/PrioritizationMessageEntity.kt | 74 +++++++++ .../service/MessagePrioritizationStateService.kt | 172 ++++++++++++++++++++ .../topology/MessageAggregateProcessor.kt | 54 +++++++ .../topology/MessageOutputProcessor.kt | 35 +++++ .../topology/MessagePrioritizationPunctuators.kt | 64 ++++++++ .../topology/MessagePrioritizationSerdes.kt | 64 ++++++++ .../topology/MessagePrioritizeProcessor.kt | 138 ++++++++++++++++ .../utils/MessageCorrelationUtils.kt | 82 ++++++++++ .../utils/MessagePrioritizationSample.kt | 103 ++++++++++++ .../MessagePrioritizationConsumerTest.kt | 175 +++++++++++++++++++++ .../message/prioritization/TestConfiguration.kt | 57 +++++++ .../utils/MessageCorrelationUtilsTest.kt | 98 ++++++++++++ .../src/test/resources/logback-test.xml | 42 +++++ ms/blueprintsprocessor/functions/pom.xml | 1 + .../kafka/AbstractKafkaTopologyComponents.kt | 65 ++++++++ .../message/kafka/KafkaJDBCStores.kt | 143 +++++++++++++++++ .../KafkaStreamsBasicAuthConsumerService.kt | 1 + 25 files changed, 1829 insertions(+) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/README.txt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt new file mode 100644 index 000000000..baf168767 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt @@ -0,0 +1,28 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml new file mode 100644 index 000000000..ac46b3635 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -0,0 +1,43 @@ + + + + + 4.0.0 + + + org.onap.ccsdk.cds.blueprintsprocessor + functions + 0.7.0-SNAPSHOT + + + org.onap.ccsdk.cds.blueprintsprocessor.functions + message-prioritizaion + + Blueprints Processor Function - Message Prioritization + Blueprints Processor Function - Message Prioritization + + + + org.onap.ccsdk.cds.blueprintsprocessor + message-lib + + + com.h2database + h2 + + + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt new file mode 100644 index 000000000..d89f71364 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt @@ -0,0 +1,41 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { + + private val log = logger(AbstractMessagePrioritizeProcessor::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + + override fun init(context: ProcessorContext) { + this.processorContext = context + /** Get the State service to update in store */ + this.messagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + + } + +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt new file mode 100644 index 000000000..cce883c91 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +open class MessagePrioritizationConfiguration + + +object MessagePrioritizationConstants { + + const val SOURCE_INPUT = "source-prioritization-input" + + const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" + const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate" + const val PROCESSOR_OUTPUT = "processor-prioritization-output" + + const val SINK_OUTPUT = "sink-prioritization-output" + const val SINK_EXPIRED = "sink-prioritization-expired" +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt new file mode 100644 index 000000000..ef9d5a058 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -0,0 +1,101 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +open class MessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) { + + private val log = logger(MessagePrioritizationConsumer::class) + + lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration) + : KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map?): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + prioritizationConfiguration), + MessagePrioritizationConstants.SOURCE_INPUT) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + prioritizationConfiguration), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT, + bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_OUTPUT, + prioritizationConfiguration), + MessagePrioritizationConstants.PROCESSOR_AGGREGATE) + + topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED, + prioritizationConfiguration.expiredTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + + topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, + prioritizationConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_OUTPUT) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (streamingConsumerService != null) { + streamingConsumerService.shutDown() + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt new file mode 100644 index 000000000..d874cef92 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -0,0 +1,69 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import java.io.Serializable + +object MessageActionConstants { + const val PRIORITIZE = "prioritize" +} + +enum class MessageState(val id: String) { + NEW("new"), + WAIT("wait"), + EXPIRED("expired"), + PRIORITIZED("prioritized"), + AGGREGATED("aggregated"), + IGNORED("ignored"), + COMPLETED("completed"), +} + +open class PrioritizationConfiguration : Serializable { + lateinit var expiryConfiguration: ExpiryConfiguration + lateinit var shutDownConfiguration: ShutDownConfiguration + lateinit var cleanConfiguration: CleanConfiguration + lateinit var inputTopicSelector: String // Consumer Configuration Selector + lateinit var expiredTopic: String // Publish Configuration Selector + lateinit var outputTopic: String // Publish Configuration Selector +} + +open class ExpiryConfiguration : Serializable { + var frequencyMilli: Long = 30000L + var maxPollRecord: Int = 1000 +} + +open class ShutDownConfiguration : Serializable { + var waitMill: Long = 30000L +} + +open class CleanConfiguration : Serializable { + var frequencyMilli: Long = 30000L + var expiredRecordsHoldDays: Int = 5 +} + +open class UpdateStateRequest : Serializable { + lateinit var id: String + var group: String? = null + var state: String? = null + var notifyMessage: String? = null +} + +data class CorrelationCheckResponse(var message: String? = null, + var correlated: Boolean = false) + +data class TypeCorrelationKey(val type: String, val correlationId: String) + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt new file mode 100644 index 000000000..94fedf4df --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -0,0 +1,44 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + + +fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) + : ProcessorSupplier { + return ProcessorSupplier { + // Dynamically resolve the Prioritization Processor + val processorInstance = BluePrintDependencyService.instance>(name) + processorInstance.prioritizationConfiguration = prioritizationConfiguration + processorInstance + } +} + +fun MessagePrioritization.toFormatedCorrelation(): String { + val ascendingKey = this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") + return ascendingKey +} + +fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { + val ascendingKey = this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") + return TypeCorrelationKey(this.type, ascendingKey) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt new file mode 100644 index 000000000..307d932a9 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.* + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc") + fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc") + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable) + : List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List, expiryCheckDate: Date, + count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByStateInAndExpiredDate(states: List, expiryCheckDate: Date, + count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByGroupAndStateInAndExpiredDate(group: String, states: List, expiryCheckDate: Date, + count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") + fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String) + : List? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") + fun findByGroupAndTypesAndCorrelationId(group: String, states: List, types: List, + correlationId: String): List? + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id") + fun setStatusForMessageId(id: String, state: String): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids") + fun setStatusForMessageIds(ids: List, state: String): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " + + "WHERE pm.id = :id") + fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") + fun deleteGroupAndStateIn(group: String, states: List) +} + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt new file mode 100644 index 000000000..4973cdf6e --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt @@ -0,0 +1,74 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import com.fasterxml.jackson.annotation.JsonFormat +import org.hibernate.annotations.Proxy +import org.springframework.data.annotation.LastModifiedDate +import org.springframework.data.jpa.domain.support.AuditingEntityListener +import org.springframework.data.jpa.repository.config.EnableJpaAuditing +import java.util.* +import javax.persistence.* + +@EnableJpaAuditing +@EntityListeners(AuditingEntityListener::class) +@Entity +@Table(name = "MESSAGE_PRIORITIZATION") +@Proxy(lazy = false) +open class MessagePrioritization { + @Id + @Column(name = "message_id", length = 50) + lateinit var id: String + + @Column(name = "message_group", length = 50, nullable = false) + lateinit var group: String + + @Column(name = "message_type", length = 50, nullable = false) + lateinit var type: String + + /** States Defined by MessageState */ + @Column(name = "message_state", length = 20, nullable = false) + lateinit var state: String + + @Lob + @Column(name = "message", nullable = false) + var message: String? = null + + @Lob + @Column(name = "aggregated_message_ids", nullable = true) + var aggregatedMessageIds: String? = null + + @Lob + @Column(name = "correlation_id", nullable = true) + var correlationId: String? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "created_date", nullable = false) + var createdDate = Date() + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @LastModifiedDate + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updated_date", nullable = false) + var updatedDate: Date? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "expiry_date", nullable = false) + var expiryDate: Date? = null +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..e4369fc20 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -0,0 +1,172 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.data.domain.PageRequest +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.* + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getExpiryEligibleMessages(count: Int): List? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): List? + + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): List? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? + + suspend fun getCorrelatedMessages(group: String, states: List, types: List?, correlationIds: String): List? + + suspend fun updateMessagesState(ids: List, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesState(ids: List, state: String) + + suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List): MessagePrioritization + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List) + + suspend fun deleteExpiredMessage(group: String, retentionDays: Int) +} + +@Service +open class MessagePrioritizationStateServiceImpl( + private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService { + + private val log = logger(MessagePrioritizationStateServiceImpl::class) + + @Transactional + override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { + if (!message.correlationId.isNullOrBlank()) { + message.correlationId = message.toFormatedCorrelation() + } + message.updatedDate = Date() + return prioritizationMessageRepository.save(message) + } + + override suspend fun getMessage(id: String): MessagePrioritization { + return prioritizationMessageRepository.findById(id).orElseGet(null) + ?: throw BluePrintProcessorException("couldn't find message for id($id)") + } + + override suspend fun getExpiryEligibleMessages(count: Int): List? { + return prioritizationMessageRepository + .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count)) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int) + : List? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group, + states, Date(), PageRequest.of(0, count)) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int) + : List? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group, + states, Date(), PageRequest.of(0, count)) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int) + : List? { + return prioritizationMessageRepository.findByByGroupAndExpiredDate(group, + expiryDate, PageRequest.of(0, count)) + } + + override suspend fun getCorrelatedMessages(group: String, states: List, types: List?, + correlationIds: String): List? { + return if (!types.isNullOrEmpty()) { + prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) + } else { + prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) + } + } + + override suspend fun updateMessagesState(ids: List, state: String) { + ids.forEach { + val updated = updateMessageState(it, state) + log.info("message($it) update to state(${updated.state})") + } + } + + @Transactional + override suspend fun setMessageState(id: String, state: String) { + prioritizationMessageRepository.setStatusForMessageId(id, state) + } + + @Transactional + override suspend fun setMessagesState(ids: List, state: String) { + prioritizationMessageRepository.setStatusForMessageIds(ids, state) + } + + @Transactional + override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + } + return saveMessage(updateMessage) + } + + override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List) + : MessagePrioritization { + + val groupedIds = groupedMessageIds.joinToString(",") + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + this.aggregatedMessageIds = groupedIds + } + return saveMessage(updateMessage) + } + + override suspend fun deleteMessage(id: String) { + return prioritizationMessageRepository.deleteById(id) + } + + override suspend fun deleteMessageByGroup(group: String) { + return prioritizationMessageRepository.deleteGroup(group) + } + + override suspend fun deleteMessageStates(group: String, states: List) { + return prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + } + + override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { + return prioritizationMessageRepository.deleteGroupAndStateIn(group, + arrayListOf(MessageState.EXPIRED.name)) + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt new file mode 100644 index 000000000..8dd4019dd --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -0,0 +1,54 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor() { + + private val log = logger(MessageAggregateProcessor::class) + + override suspend fun processNB(key: String, value: String) { + + log.info("@@@@@ received in aggregation processor key($key), value($value)") + val ids = value.split(",").map { it.trim() } + if (!ids.isNullOrEmpty()) { + if (ids.size == 1) { + processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) + } else { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(ids) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + } + } + } + + /** Child will override this implementation , if necessary */ + open suspend fun handleAggregation(messageIds: List) { + log.info("messages($messageIds) aggregated") + messageIds.forEach { id -> + processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt new file mode 100644 index 000000000..34faa1b3b --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor() { + + private val log = logger(MessageOutputProcessor::class) + + override suspend fun processNB(key: String, value: String) { + log.info("$$$$$ received in output processor key($key), value($value)") + val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name) + processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt new file mode 100644 index 000000000..a745e034c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) + : AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityExpiryPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + + log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + val expiryConfiguration = configuration.expiryConfiguration + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + + val expiredIds = fetchMessages?.map { it.id } + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expired -> + processorContext.forward(expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_EXPIRED)) + } + } + } +} + +class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) + : AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityCleanPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + //TODO + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt new file mode 100644 index 000000000..00d454727 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt new file mode 100644 index 000000000..5a5aa2575 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -0,0 +1,138 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.Cancellable +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.time.Duration +import java.util.* + + +open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor() { + + private val log = logger(MessagePrioritizeProcessor::class) + + lateinit var expiryCancellable: Cancellable + lateinit var cleanCancellable: Cancellable + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + log.info("***** received in prioritize processor key(${String(key)})") + val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + // Save the Message + messagePrioritizationStateService.saveMessage(data) + handleCorrelationAndNextStep(data) + + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** set up expiry marking cron */ + initializeExpiryPunctuator() + /** Set up cleaning records cron */ + initializeCleanPunctuator() + } + + override fun close() { + log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + expiryCancellable.cancel() + cleanCancellable.cancel() + } + + open fun initializeExpiryPunctuator() { + val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService) + expiryPunctuator.processorContext = processorContext + expiryPunctuator.configuration = prioritizationConfiguration + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli), + PunctuationType.WALL_CLOCK_TIME, expiryPunctuator) + log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") + } + + open fun initializeCleanPunctuator() { + val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService) + cleanPunctuator.processorContext = processorContext + cleanPunctuator.configuration = prioritizationConfiguration + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), + PunctuationType.WALL_CLOCK_TIME, cleanPunctuator) + log.info("Clean punctuator setup complete with expiry " + + "hold(${cleanConfiguration.expiredRecordsHoldDays})days") + } + + open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { + /** Check correlation enabled and correlation field has populated */ + if (!messagePrioritization.correlationId.isNullOrBlank()) { + val id = messagePrioritization.id + val group = messagePrioritization.group + val correlationId = messagePrioritization.correlationId!! + val types = getGroupCorrelationTypes(messagePrioritization) + log.info("checking correlation for message($id), group($group), types($types), " + + "correlation id($correlationId)") + + /** Get all previously received messages from database for group and optional types and correlation Id */ + val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId) + + /** If multiple records found, then check correlation */ + if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { + /** Check all correlation satisfies */ + val correlationResults = MessageCorrelationUtils + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + + if (correlationResults.correlated) { + /** Correlation satisfied */ + val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") + /** Send only correlated ids to next processor */ + this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + } else { + /** Correlation not satisfied */ + log.trace("correlation not matched : ${correlationResults.message}") + val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id } + // Update the Message state to Wait + messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name) + } + } else { + /** received first message of group and correlation Id, update the message with wait state */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) + } + } else { + // No Correlation check needed, simply forward to next processor. + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) + this.processorContext.forward(messagePrioritization.id, messagePrioritization.id, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + } + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + return null + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt new file mode 100644 index 000000000..cc30af2f1 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +object MessageCorrelationUtils { + + /** Assumption is message is of same group **/ + fun correlatedMessages(collectedMessages: List): CorrelationCheckResponse { + val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") + if (collectedMessages.size > 1) { + val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } + if (filteredMessage.isNotEmpty()) { + val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } + if (groupedMessage.size == 1) { + correlationCheckResponse.correlated = true + correlationCheckResponse.message = null + } + } + } else { + correlationCheckResponse.message = "received only one message for that group" + } + return correlationCheckResponse + } + + /** Assumption is message is of same group and checking for required types **/ + fun correlatedMessagesWithTypes(collectedMessages: List, types: List?) + : CorrelationCheckResponse { + + return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { + + val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } + if (!unknownMessageTypes.isNullOrEmpty()) { + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + } + + val copyTypes = types.toTypedArray().copyOf().toMutableList() + + val filteredMessage = collectedMessages.filter { + !it.correlationId.isNullOrBlank() + && types.contains(it.type) + } + var correlatedKeys: MutableSet = mutableSetOf() + if (filteredMessage.isNotEmpty()) { + val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } + val foundType = correlatedMap.keys.map { it.type } + copyTypes.removeAll(foundType) + correlatedKeys = correlatedMap.keys.map { + it.correlationId + }.toMutableSet() + } + /** Check if any Types missing and same correlation id for all types */ + return if (copyTypes.isEmpty()) { + if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) + else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } else { + CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + } + } else { + return correlatedMessages(collectedMessages) + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt new file mode 100644 index 000000000..3281a97f9 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -0,0 +1,103 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.* + +object MessagePrioritizationSample { + + fun samplePrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + inputTopicSelector = "prioritize-input" + outputTopic = "prioritize-output-topic" + expiredTopic = "prioritize-expired-topic" + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10000L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 2000L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10000L + expiredRecordsHoldDays = 5 + } + } + } + + private fun currentDatePlusDays(days: Int): Date { + val calender = Calendar.getInstance() + calender.add(Calendar.DATE, days) + return calender.time + } + + fun sampleMessages(messageState: String, count: Int): List { + return sampleMessages("sample-group", messageState, count) + } + + fun sampleMessages(groupName: String, messageState: String, count: Int): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, + "sample-type", null) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, "sample-type", + "key1=value1,key2=value2") + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String, + count: Int): List { + val messages: MutableList = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, "type-$it", + "key1=value1,key2=value2") + messages.add(backPressureMessage) + } + return messages + } + + fun createMessage(groupName: String, messageState: String, messageType: String, + messageCorrelationId: String?): MessagePrioritization { + + return MessagePrioritization().apply { + id = UUID.randomUUID().toString() + group = groupName + type = messageType + state = messageState + correlationId = messageCorrelationId + message = "I am the Message" + createdDate = Date() + updatedDate = Date() + expiryDate = currentDatePlusDays(3) + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt new file mode 100644 index 000000000..bd99f72d0 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -0,0 +1,175 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import io.mockk.coEvery +import io.mockk.every +import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.context.ApplicationContext +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull + + +@RunWith(SpringRunner::class) +@DataJpaTest +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]) +@TestPropertySource(properties = +[ + "spring.jpa.show-sql=true", + "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" +]) +open class MessagePrioritizationConsumerTest { + + @Autowired + lateinit var applicationContext: ApplicationContext + + @Autowired + lateinit var prioritizationMessageRepository: PrioritizationMessageRepository + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Before + fun setup() { + BluePrintDependencyService.inject(applicationContext) + } + + @Test + fun testBluePrintKafkaJDBCKeyStore() { + runBlocking { + assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") + + val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") + + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { + val message = messagePrioritizationService.saveMessage(it) + val repoResult = messagePrioritizationService.getMessage(message.id) + assertNotNull(repoResult, "failed to get inserted message.") + } + } + } + + @Test + fun testStartConsuming() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + + val streamingConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(configuration.inputTopicSelector) + assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") + + val spyStreamingConsumerService = spyk(streamingConsumerService) + coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit + coEvery { spyStreamingConsumerService.shutDown() } returns Unit + val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) + + + // Test Topology + val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) + assertNotNull(topology, "failed to get create topology") + + every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService + spyMessagePrioritizationConsumer.startConsuming(configuration) + spyMessagePrioritizationConsumer.shutDown() + } + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testMessagePrioritizationConsumer() { + runBlocking { + val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + + /** Send sample message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + } + delay(10000) + messagePrioritizationConsumer.shutDown() + } + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt new file mode 100644 index 000000000..4e3eb191b --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -0,0 +1,57 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.PrimaryDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Service +import javax.sql.DataSource + +@Configuration +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) +@EnableAutoConfiguration +open class TestDatabaseConfiguration { + + @Bean("primaryDBLibGenericService") + open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { + return PrimaryDBLibGenericService(NamedParameterJdbcTemplate(dataSource)) + } +} + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + return when (messagePrioritization.group) { + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + else -> null + } + } +} + +@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) +open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() + +@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) +open class DefaultMessageOutputProcessor : MessageOutputProcessor() \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt new file mode 100644 index 000000000..b470db909 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import kotlin.test.assertTrue + +class MessageCorrelationUtilsTest { + + @Test + fun testCorrelationKeysReordered() { + + val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2") + val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1") + + val multipleMessages: MutableList = arrayListOf() + multipleMessages.add(message1) + multipleMessages.add(message2) + val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) + assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") + } + + @Test + fun differentTypesWithSameCorrelationMessages() { + /** With Types **/ + /* Assumption is Same group with different types */ + val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2")) + assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse") + + /* Assumption is Same group with different types and one missing expected types, + In this case type-3 message is missing */ + val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3")) + assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType") + } + + @Test + fun withSameCorrelationMessagesWithIgnoredTypes() { + /** With ignoring Types */ + /** Assumption is only one message received */ + val withSameCorrelationOneMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationOneMessages, null) + assertTrue(!withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp") + + /** Assumption is two message received for same group with same correlation */ + val withSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationMessages, null) + assertTrue(withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp") + } + + @Test + fun differentTypesWithDifferentCorrelationMessage() { + /** Assumption is two message received for same group with different expected types and different correlation */ + val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2") + val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3") + val differentTypesWithDifferentCorrelationMessage: MutableList = arrayListOf() + differentTypesWithDifferentCorrelationMessage.add(message1) + differentTypesWithDifferentCorrelationMessage.add(message2) + val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1")) + assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp") + } +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e3a1f7a01 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + ${localPattern} + + + + + + + + + + + + + diff --git a/ms/blueprintsprocessor/functions/pom.xml b/ms/blueprintsprocessor/functions/pom.xml index 3ee6737ef..38f9071ee 100755 --- a/ms/blueprintsprocessor/functions/pom.xml +++ b/ms/blueprintsprocessor/functions/pom.xml @@ -39,6 +39,7 @@ restconf-executor cli-executor config-snapshots + message-prioritizaion diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt new file mode 100644 index 000000000..4c6c0acdd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.Punctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** CDS Kafka Stream Processor abstract class to implement */ +abstract class AbstractBluePrintMessageProcessor : Processor { + + private val log = logger(AbstractBluePrintMessageProcessor::class) + + lateinit var processorContext: ProcessorContext + + + override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) { + try { + processNB(key, value) + } catch (e: Exception) { + log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e) + } + } + + override fun init(context: ProcessorContext) { + log.info("initializing processor (${this.javaClass.simpleName})") + this.processorContext = context + + } + + override fun close() { + log.info("closing processor (${this.javaClass.simpleName})") + } + + abstract suspend fun processNB(key: K, value: V) +} + +/** CDS Kafka Stream Punctuator abstract class to implement */ +abstract class AbstractBluePrintMessagePunctuator : Punctuator { + lateinit var processorContext: ProcessorContext + + + override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) { + punctuateNB(timestamp) + } + + abstract suspend fun punctuateNB(timestamp: Long) +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt new file mode 100644 index 000000000..86ccd74a2 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +/* +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.streams.state.StoreBuilder +import org.apache.kafka.streams.state.StoreSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import java.util.* + + +class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier { + + override fun get(): KafkaJDBCStore { + // Get the DBLibGenericService Instance + val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService() + return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService) + } + + override fun name(): String { + return name + } + + override fun metricsScope(): String { + return "jdbc-state" + } +} + +class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier) + : StoreBuilder { + + private var logConfig: MutableMap = HashMap() + private var enableCaching: Boolean = false + private var enableLogging = true + + override fun logConfig(): MutableMap { + return logConfig + } + + override fun withCachingDisabled(): StoreBuilder { + enableCaching = false + return this + } + + override fun loggingEnabled(): Boolean { + return enableLogging + } + + override fun withLoggingDisabled(): StoreBuilder { + enableLogging = false + return this + } + + override fun withCachingEnabled(): StoreBuilder { + enableCaching = true + return this + } + + override fun withLoggingEnabled(config: MutableMap?): StoreBuilder { + enableLogging = true + return this + } + + override fun name(): String { + return "KafkaJDBCKeyStoreBuilder" + } + + override fun build(): KafkaJDBCStore { + return storeSupplier.get() + } +} + +interface KafkaJDBCStore : StateStore { + + suspend fun query(sql: String, params: Map): List> + + suspend fun update(sql: String, params: Map): Int +} + + +class KafkaJDBCStoreImpl(private val name: String, + private val bluePrintDBLibGenericService: BluePrintDBLibGenericService) + : KafkaJDBCStore { + + private val log = logger(KafkaJDBCStoreImpl::class) + + override fun isOpen(): Boolean { + log.info("isOpen...") + return true + } + + override fun init(context: ProcessorContext, root: StateStore) { + log.info("init...") + } + + override fun flush() { + log.info("flush...") + } + + override fun close() { + log.info("Close...") + } + + override fun name(): String { + return name + } + + override fun persistent(): Boolean { + return true + } + + override suspend fun query(sql: String, params: Map): List> { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.query(sql, params) + } + + override suspend fun update(sql: String, params: Map): Int { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.update(sql, params) + } +} +*/ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt index 229e462da..d0297df4c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope val streamsConfig = streamsConfig(additionalConfig) val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") kafkaStreams = KafkaStreams(topology, streamsConfig) kafkaStreams.cleanUp() kafkaStreams.start() -- cgit 1.2.3-korg