aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/README.txt28
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml43
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt41
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt37
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt101
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt69
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt44
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt98
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt74
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt172
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt54
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt35
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt64
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt64
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt138
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt103
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt175
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt57
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt98
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml42
21 files changed, 1619 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt
new file mode 100644
index 000000000..baf168767
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt
@@ -0,0 +1,28 @@
+
+To Delete Topics
+------------------
+kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
+kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
+kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic
+kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
+
+Create Topics
+--------------
+
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
+
+To List topics
+----------------
+kafka-topics --list --bootstrap-server localhost:9092
+
+
+To Listen for Output
+----------------------
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
+
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
+
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
new file mode 100644
index 000000000..ac46b3635
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>functions</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
+ <artifactId>message-prioritizaion</artifactId>
+
+ <name>Blueprints Processor Function - Message Prioritization</name>
+ <description>Blueprints Processor Function - Message Prioritization</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+ <artifactId>message-lib</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
new file mode 100644
index 000000000..d89f71364
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
+
+ private val log = logger(AbstractMessagePrioritizeProcessor::class)
+
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+
+ override fun init(context: ProcessorContext) {
+ this.processorContext = context
+ /** Get the State service to update in store */
+ this.messagePrioritizationStateService = BluePrintDependencyService
+ .instance(MessagePrioritizationStateService::class)
+
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
new file mode 100644
index 000000000..cce883c91
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+open class MessagePrioritizationConfiguration
+
+
+object MessagePrioritizationConstants {
+
+ const val SOURCE_INPUT = "source-prioritization-input"
+
+ const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
+ const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
+ const val PROCESSOR_OUTPUT = "processor-prioritization-output"
+
+ const val SINK_OUTPUT = "sink-prioritization-output"
+ const val SINK_EXPIRED = "sink-prioritization-expired"
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..ef9d5a058
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class MessagePrioritizationConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) {
+
+ private val log = logger(MessagePrioritizationConsumer::class)
+
+ lateinit var streamingConsumerService: BlueprintMessageConsumerService
+
+ open fun consumerService(selector: String): BlueprintMessageConsumerService {
+ return bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(selector)
+ }
+
+ open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration)
+ : KafkaStreamConsumerFunction {
+ return object : KafkaStreamConsumerFunction {
+
+ override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?): Topology {
+
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ log.info("Consuming prioritization topics($topics)")
+
+ topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
+
+ topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ prioritizationConfiguration),
+ MessagePrioritizationConstants.SOURCE_INPUT)
+
+ topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+ bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+ prioritizationConfiguration),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+
+ topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+ bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+ prioritizationConfiguration),
+ MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+
+ topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED,
+ prioritizationConfiguration.expiredTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+
+ topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
+ prioritizationConfiguration.outputTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+
+ // Output will be sent to the group-output topic from Processor API
+ return topology
+ }
+ }
+ }
+
+ suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
+ streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+ // Dynamic Consumer Function to create Topology
+ val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
+ streamingConsumerService.consume(null, consumerFunction)
+ }
+
+ suspend fun shutDown() {
+ if (streamingConsumerService != null) {
+ streamingConsumerService.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
new file mode 100644
index 000000000..d874cef92
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import java.io.Serializable
+
+object MessageActionConstants {
+ const val PRIORITIZE = "prioritize"
+}
+
+enum class MessageState(val id: String) {
+ NEW("new"),
+ WAIT("wait"),
+ EXPIRED("expired"),
+ PRIORITIZED("prioritized"),
+ AGGREGATED("aggregated"),
+ IGNORED("ignored"),
+ COMPLETED("completed"),
+}
+
+open class PrioritizationConfiguration : Serializable {
+ lateinit var expiryConfiguration: ExpiryConfiguration
+ lateinit var shutDownConfiguration: ShutDownConfiguration
+ lateinit var cleanConfiguration: CleanConfiguration
+ lateinit var inputTopicSelector: String // Consumer Configuration Selector
+ lateinit var expiredTopic: String // Publish Configuration Selector
+ lateinit var outputTopic: String // Publish Configuration Selector
+}
+
+open class ExpiryConfiguration : Serializable {
+ var frequencyMilli: Long = 30000L
+ var maxPollRecord: Int = 1000
+}
+
+open class ShutDownConfiguration : Serializable {
+ var waitMill: Long = 30000L
+}
+
+open class CleanConfiguration : Serializable {
+ var frequencyMilli: Long = 30000L
+ var expiredRecordsHoldDays: Int = 5
+}
+
+open class UpdateStateRequest : Serializable {
+ lateinit var id: String
+ var group: String? = null
+ var state: String? = null
+ var notifyMessage: String? = null
+}
+
+data class CorrelationCheckResponse(var message: String? = null,
+ var correlated: Boolean = false)
+
+data class TypeCorrelationKey(val type: String, val correlationId: String)
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
new file mode 100644
index 000000000..94fedf4df
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+
+fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
+ : ProcessorSupplier<K, V> {
+ return ProcessorSupplier<K, V> {
+ // Dynamically resolve the Prioritization Processor
+ val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+ processorInstance.prioritizationConfiguration = prioritizationConfiguration
+ processorInstance
+ }
+}
+
+fun MessagePrioritization.toFormatedCorrelation(): String {
+ val ascendingKey = this.correlationId!!.split(",")
+ .map { it.trim() }.sorted().joinToString(",")
+ return ascendingKey
+}
+
+fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
+ val ascendingKey = this.correlationId!!.split(",")
+ .map { it.trim() }.sorted().joinToString(",")
+ return TypeCorrelationKey(this.type, ascendingKey)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
new file mode 100644
index 000000000..307d932a9
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import org.springframework.data.domain.Pageable
+import org.springframework.data.jpa.repository.JpaRepository
+import org.springframework.data.jpa.repository.Modifying
+import org.springframework.data.jpa.repository.Query
+import org.springframework.stereotype.Repository
+import org.springframework.transaction.annotation.Transactional
+import java.util.*
+
+@Repository
+@Transactional(readOnly = true)
+interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
+ fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.createdDate asc")
+ fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.updatedDate asc")
+ fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable)
+ : List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc")
+ fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
+ count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+ fun findByStateInAndExpiredDate(states: List<String>, expiryCheckDate: Date,
+ count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+ fun findByGroupAndStateInAndExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
+ count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+ fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
+ fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String)
+ : List<MessagePrioritization>?
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
+ fun findByGroupAndTypesAndCorrelationId(group: String, states: List<String>, types: List<String>,
+ correlationId: String): List<MessagePrioritization>?
+
+ @Modifying
+ @Transactional
+ @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id")
+ fun setStatusForMessageId(id: String, state: String): Int
+
+ @Modifying
+ @Transactional
+ @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids")
+ fun setStatusForMessageIds(ids: List<String>, state: String): Int
+
+ @Modifying
+ @Transactional
+ @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " +
+ "WHERE pm.id = :id")
+ fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
+ fun deleteGroup(group: String)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
+ fun deleteGroupAndStateIn(group: String, states: List<String>)
+}
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
new file mode 100644
index 000000000..4973cdf6e
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import com.fasterxml.jackson.annotation.JsonFormat
+import org.hibernate.annotations.Proxy
+import org.springframework.data.annotation.LastModifiedDate
+import org.springframework.data.jpa.domain.support.AuditingEntityListener
+import org.springframework.data.jpa.repository.config.EnableJpaAuditing
+import java.util.*
+import javax.persistence.*
+
+@EnableJpaAuditing
+@EntityListeners(AuditingEntityListener::class)
+@Entity
+@Table(name = "MESSAGE_PRIORITIZATION")
+@Proxy(lazy = false)
+open class MessagePrioritization {
+ @Id
+ @Column(name = "message_id", length = 50)
+ lateinit var id: String
+
+ @Column(name = "message_group", length = 50, nullable = false)
+ lateinit var group: String
+
+ @Column(name = "message_type", length = 50, nullable = false)
+ lateinit var type: String
+
+ /** States Defined by MessageState */
+ @Column(name = "message_state", length = 20, nullable = false)
+ lateinit var state: String
+
+ @Lob
+ @Column(name = "message", nullable = false)
+ var message: String? = null
+
+ @Lob
+ @Column(name = "aggregated_message_ids", nullable = true)
+ var aggregatedMessageIds: String? = null
+
+ @Lob
+ @Column(name = "correlation_id", nullable = true)
+ var correlationId: String? = null
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "created_date", nullable = false)
+ var createdDate = Date()
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @LastModifiedDate
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "updated_date", nullable = false)
+ var updatedDate: Date? = null
+
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ @Temporal(TemporalType.TIMESTAMP)
+ @Column(name = "expiry_date", nullable = false)
+ var expiryDate: Date? = null
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
new file mode 100644
index 000000000..e4369fc20
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
@@ -0,0 +1,172 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.data.domain.PageRequest
+import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
+import java.util.*
+
+interface MessagePrioritizationStateService {
+
+ suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+ suspend fun getMessage(id: String): MessagePrioritization
+
+ suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
+
+ suspend fun updateMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+ suspend fun setMessageState(id: String, state: String)
+
+ suspend fun setMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
+
+ suspend fun deleteMessage(id: String)
+
+ suspend fun deleteMessageByGroup(group: String)
+
+ suspend fun deleteMessageStates(group: String, states: List<String>)
+
+ suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
+
+@Service
+open class MessagePrioritizationStateServiceImpl(
+ private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
+
+ private val log = logger(MessagePrioritizationStateServiceImpl::class)
+
+ @Transactional
+ override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
+ if (!message.correlationId.isNullOrBlank()) {
+ message.correlationId = message.toFormatedCorrelation()
+ }
+ message.updatedDate = Date()
+ return prioritizationMessageRepository.save(message)
+ }
+
+ override suspend fun getMessage(id: String): MessagePrioritization {
+ return prioritizationMessageRepository.findById(id).orElseGet(null)
+ ?: throw BluePrintProcessorException("couldn't find message for id($id)")
+ }
+
+ override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
+ return prioritizationMessageRepository
+ .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
+ Date(), PageRequest.of(0, count))
+ }
+
+ override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
+ : List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
+ states, Date(), PageRequest.of(0, count))
+ }
+
+ override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
+ : List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
+ states, Date(), PageRequest.of(0, count))
+ }
+
+ override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
+ : List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
+ expiryDate, PageRequest.of(0, count))
+ }
+
+ override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
+ correlationIds: String): List<MessagePrioritization>? {
+ return if (!types.isNullOrEmpty()) {
+ prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
+ } else {
+ prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
+ }
+ }
+
+ override suspend fun updateMessagesState(ids: List<String>, state: String) {
+ ids.forEach {
+ val updated = updateMessageState(it, state)
+ log.info("message($it) update to state(${updated.state})")
+ }
+ }
+
+ @Transactional
+ override suspend fun setMessageState(id: String, state: String) {
+ prioritizationMessageRepository.setStatusForMessageId(id, state)
+ }
+
+ @Transactional
+ override suspend fun setMessagesState(ids: List<String>, state: String) {
+ prioritizationMessageRepository.setStatusForMessageIds(ids, state)
+ }
+
+ @Transactional
+ override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
+ val updateMessage = getMessage(id).apply {
+ this.updatedDate = Date()
+ this.state = state
+ }
+ return saveMessage(updateMessage)
+ }
+
+ override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
+ : MessagePrioritization {
+
+ val groupedIds = groupedMessageIds.joinToString(",")
+ val updateMessage = getMessage(id).apply {
+ this.updatedDate = Date()
+ this.state = state
+ this.aggregatedMessageIds = groupedIds
+ }
+ return saveMessage(updateMessage)
+ }
+
+ override suspend fun deleteMessage(id: String) {
+ return prioritizationMessageRepository.deleteById(id)
+ }
+
+ override suspend fun deleteMessageByGroup(group: String) {
+ return prioritizationMessageRepository.deleteGroup(group)
+ }
+
+ override suspend fun deleteMessageStates(group: String, states: List<String>) {
+ return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+ }
+
+ override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
+ return prioritizationMessageRepository.deleteGroupAndStateIn(group,
+ arrayListOf(MessageState.EXPIRED.name))
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
new file mode 100644
index 000000000..8dd4019dd
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
+
+ private val log = logger(MessageAggregateProcessor::class)
+
+ override suspend fun processNB(key: String, value: String) {
+
+ log.info("@@@@@ received in aggregation processor key($key), value($value)")
+ val ids = value.split(",").map { it.trim() }
+ if (!ids.isNullOrEmpty()) {
+ if (ids.size == 1) {
+ processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ }
+ }
+
+ /** Child will override this implementation , if necessary */
+ open suspend fun handleAggregation(messageIds: List<String>) {
+ log.info("messages($messageIds) aggregated")
+ messageIds.forEach { id ->
+ processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
new file mode 100644
index 000000000..34faa1b3b
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
+
+ private val log = logger(MessageOutputProcessor::class)
+
+ override suspend fun processNB(key: String, value: String) {
+ log.info("$$$$$ received in output processor key($key), value($value)")
+ val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
+ processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
new file mode 100644
index 000000000..a745e034c
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
+ : AbstractBluePrintMessagePunctuator() {
+
+ private val log = logger(MessagePriorityExpiryPunctuator::class)
+ lateinit var configuration: PrioritizationConfiguration
+
+ override suspend fun punctuateNB(timestamp: Long) {
+
+ log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})")
+ val expiryConfiguration = configuration.expiryConfiguration
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+
+ val expiredIds = fetchMessages?.map { it.id }
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expired ->
+ processorContext.forward(expired.id, expired,
+ To.child(MessagePrioritizationConstants.SINK_EXPIRED))
+ }
+ }
+ }
+}
+
+class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
+ : AbstractBluePrintMessagePunctuator() {
+
+ private val log = logger(MessagePriorityCleanPunctuator::class)
+ lateinit var configuration: PrioritizationConfiguration
+
+ override suspend fun punctuateNB(timestamp: Long) {
+ log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})")
+ //TODO
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt
new file mode 100644
index 000000000..00d454727
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.nio.charset.Charset
+
+open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+
+ override fun deserializer(): Deserializer<MessagePrioritization> {
+ return object : Deserializer<MessagePrioritization> {
+ override fun deserialize(topic: String, data: ByteArray): MessagePrioritization {
+ return JacksonUtils.readValue(String(data), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ }
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+ }
+ }
+
+ override fun serializer(): Serializer<MessagePrioritization> {
+ return object : Serializer<MessagePrioritization> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, data: MessagePrioritization): ByteArray {
+ return data.asJsonString().toByteArray(Charset.defaultCharset())
+ }
+
+ override fun close() {
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
new file mode 100644
index 000000000..5a5aa2575
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+import java.util.*
+
+
+open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+ private val log = logger(MessagePrioritizeProcessor::class)
+
+ lateinit var expiryCancellable: Cancellable
+ lateinit var cleanCancellable: Cancellable
+
+ override suspend fun processNB(key: ByteArray, value: ByteArray) {
+ log.info("***** received in prioritize processor key(${String(key)})")
+ val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ // Save the Message
+ messagePrioritizationStateService.saveMessage(data)
+ handleCorrelationAndNextStep(data)
+
+ }
+
+ override fun init(context: ProcessorContext) {
+ super.init(context)
+ /** set up expiry marking cron */
+ initializeExpiryPunctuator()
+ /** Set up cleaning records cron */
+ initializeCleanPunctuator()
+ }
+
+ override fun close() {
+ log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})")
+ expiryCancellable.cancel()
+ cleanCancellable.cancel()
+ }
+
+ open fun initializeExpiryPunctuator() {
+ val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
+ expiryPunctuator.processorContext = processorContext
+ expiryPunctuator.configuration = prioritizationConfiguration
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator)
+ log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+ }
+
+ open fun initializeCleanPunctuator() {
+ val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
+ cleanPunctuator.processorContext = processorContext
+ cleanPunctuator.configuration = prioritizationConfiguration
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator)
+ log.info("Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days")
+ }
+
+ open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
+ /** Check correlation enabled and correlation field has populated */
+ if (!messagePrioritization.correlationId.isNullOrBlank()) {
+ val id = messagePrioritization.id
+ val group = messagePrioritization.group
+ val correlationId = messagePrioritization.correlationId!!
+ val types = getGroupCorrelationTypes(messagePrioritization)
+ log.info("checking correlation for message($id), group($group), types($types), " +
+ "correlation id($correlationId)")
+
+ /** Get all previously received messages from database for group and optional types and correlation Id */
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId)
+
+ /** If multiple records found, then check correlation */
+ if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
+ /** Check all correlation satisfies */
+ val correlationResults = MessageCorrelationUtils
+ .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+
+ if (correlationResults.correlated) {
+ /** Correlation satisfied */
+ val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
+ /** Send only correlated ids to next processor */
+ this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ } else {
+ /** Correlation not satisfied */
+ log.trace("correlation not matched : ${correlationResults.message}")
+ val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
+ // Update the Message state to Wait
+ messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+ }
+ } else {
+ /** received first message of group and correlation Id, update the message with wait state */
+ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
+ }
+ } else {
+ // No Correlation check needed, simply forward to next processor.
+ messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
+ this.processorContext.forward(messagePrioritization.id, messagePrioritization.id,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ }
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return null
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
new file mode 100644
index 000000000..cc30af2f1
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+object MessageCorrelationUtils {
+
+ /** Assumption is message is of same group **/
+ fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse {
+ val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated")
+ if (collectedMessages.size > 1) {
+ val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() }
+ if (filteredMessage.isNotEmpty()) {
+ val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() }
+ if (groupedMessage.size == 1) {
+ correlationCheckResponse.correlated = true
+ correlationCheckResponse.message = null
+ }
+ }
+ } else {
+ correlationCheckResponse.message = "received only one message for that group"
+ }
+ return correlationCheckResponse
+ }
+
+ /** Assumption is message is of same group and checking for required types **/
+ fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?)
+ : CorrelationCheckResponse {
+
+ return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
+
+ val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
+ if (!unknownMessageTypes.isNullOrEmpty()) {
+ throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
+ }
+
+ val copyTypes = types.toTypedArray().copyOf().toMutableList()
+
+ val filteredMessage = collectedMessages.filter {
+ !it.correlationId.isNullOrBlank()
+ && types.contains(it.type)
+ }
+ var correlatedKeys: MutableSet<String> = mutableSetOf()
+ if (filteredMessage.isNotEmpty()) {
+ val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
+ val foundType = correlatedMap.keys.map { it.type }
+ copyTypes.removeAll(foundType)
+ correlatedKeys = correlatedMap.keys.map {
+ it.correlationId
+ }.toMutableSet()
+ }
+ /** Check if any Types missing and same correlation id for all types */
+ return if (copyTypes.isEmpty()) {
+ if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
+ else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
+ } else {
+ CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
+ }
+ } else {
+ return correlatedMessages(collectedMessages)
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
new file mode 100644
index 000000000..3281a97f9
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.*
+
+object MessagePrioritizationSample {
+
+ fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
+ return PrioritizationConfiguration().apply {
+ inputTopicSelector = "prioritize-input"
+ outputTopic = "prioritize-output-topic"
+ expiredTopic = "prioritize-expired-topic"
+ expiryConfiguration = ExpiryConfiguration().apply {
+ frequencyMilli = 10000L
+ maxPollRecord = 2000
+ }
+ shutDownConfiguration = ShutDownConfiguration().apply {
+ waitMill = 2000L
+ }
+ cleanConfiguration = CleanConfiguration().apply {
+ frequencyMilli = 10000L
+ expiredRecordsHoldDays = 5
+ }
+ }
+ }
+
+ private fun currentDatePlusDays(days: Int): Date {
+ val calender = Calendar.getInstance()
+ calender.add(Calendar.DATE, days)
+ return calender.time
+ }
+
+ fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
+ return sampleMessages("sample-group", messageState, count)
+ }
+
+ fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(groupName, messageState,
+ "sample-type", null)
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(groupName, messageState, "sample-type",
+ "key1=value1,key2=value2")
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String,
+ count: Int): List<MessagePrioritization> {
+ val messages: MutableList<MessagePrioritization> = arrayListOf()
+ repeat(count) {
+ val backPressureMessage = createMessage(groupName, messageState, "type-$it",
+ "key1=value1,key2=value2")
+ messages.add(backPressureMessage)
+ }
+ return messages
+ }
+
+ fun createMessage(groupName: String, messageState: String, messageType: String,
+ messageCorrelationId: String?): MessagePrioritization {
+
+ return MessagePrioritization().apply {
+ id = UUID.randomUUID().toString()
+ group = groupName
+ type = messageType
+ state = messageState
+ correlationId = messageCorrelationId
+ message = "I am the Message"
+ createdDate = Date()
+ updatedDate = Date()
+ expiryDate = currentDatePlusDays(3)
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
new file mode 100644
index 000000000..bd99f72d0
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -0,0 +1,175 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import io.mockk.coEvery
+import io.mockk.every
+import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.context.ApplicationContext
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DataJpaTest
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
+@TestPropertySource(properties =
+[
+ "spring.jpa.show-sql=true",
+ "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+])
+open class MessagePrioritizationConsumerTest {
+
+ @Autowired
+ lateinit var applicationContext: ApplicationContext
+
+ @Autowired
+ lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Before
+ fun setup() {
+ BluePrintDependencyService.inject(applicationContext)
+ }
+
+ @Test
+ fun testBluePrintKafkaJDBCKeyStore() {
+ runBlocking {
+ assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
+
+ val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
+ .instance(MessagePrioritizationStateService::class)
+ assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
+
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
+ val message = messagePrioritizationService.saveMessage(it)
+ val repoResult = messagePrioritizationService.getMessage(message.id)
+ assertNotNull(repoResult, "failed to get inserted message.")
+ }
+ }
+ }
+
+ @Test
+ fun testStartConsuming() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+
+ val streamingConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService(configuration.inputTopicSelector)
+ assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyStreamingConsumerService = spyk(streamingConsumerService)
+ coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
+ coEvery { spyStreamingConsumerService.shutDown() } returns Unit
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
+
+
+ // Test Topology
+ val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
+ assertNotNull(topology, "failed to get create topology")
+
+ every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
+ spyMessagePrioritizationConsumer.startConsuming(configuration)
+ spyMessagePrioritizationConsumer.shutDown()
+ }
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testMessagePrioritizationConsumer() {
+ runBlocking {
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ /** Send sample message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+ headers = headers)
+ }
+ }
+ delay(10000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
new file mode 100644
index 000000000..4e3eb191b
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.PrimaryDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
+import org.springframework.stereotype.Service
+import javax.sql.DataSource
+
+@Configuration
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
+@EnableAutoConfiguration
+open class TestDatabaseConfiguration {
+
+ @Bean("primaryDBLibGenericService")
+ open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
+ return PrimaryDBLibGenericService(NamedParameterJdbcTemplate(dataSource))
+ }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ else -> null
+ }
+ }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+
+@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+open class DefaultMessageOutputProcessor : MessageOutputProcessor() \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
new file mode 100644
index 000000000..b470db909
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import kotlin.test.assertTrue
+
+class MessageCorrelationUtilsTest {
+
+ @Test
+ fun testCorrelationKeysReordered() {
+
+ val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2")
+ val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key2=value2,key1=value1")
+
+ val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
+ multipleMessages.add(message1)
+ multipleMessages.add(message2)
+ val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
+ assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
+ }
+
+ @Test
+ fun differentTypesWithSameCorrelationMessages() {
+ /** With Types **/
+ /* Assumption is Same group with different types */
+ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+ val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2"))
+ assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResponse")
+
+ /* Assumption is Same group with different types and one missing expected types,
+ In this case type-3 message is missing */
+ val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3"))
+ assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType")
+ }
+
+ @Test
+ fun withSameCorrelationMessagesWithIgnoredTypes() {
+ /** With ignoring Types */
+ /** Assumption is only one message received */
+ val withSameCorrelationOneMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+ val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationOneMessages, null)
+ assertTrue(!withSameCorrelationOneMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp")
+
+ /** Assumption is two message received for same group with same correlation */
+ val withSameCorrelationMessages = MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+ val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ withSameCorrelationMessages, null)
+ assertTrue(withSameCorrelationMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp")
+ }
+
+ @Test
+ fun differentTypesWithDifferentCorrelationMessage() {
+ /** Assumption is two message received for same group with different expected types and different correlation */
+ val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2")
+ val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+ "type-1", "key1=value1,key2=value3")
+ val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
+ differentTypesWithDifferentCorrelationMessage.add(message1)
+ differentTypesWithDifferentCorrelationMessage.add(message2)
+ val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+ differentTypesWithDifferentCorrelationMessage,
+ arrayListOf("type-0", "type-1"))
+ assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated,
+ "failed to correlate differentTypesWithDifferentCorrelationMessageResp")
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..e3a1f7a01
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>${localPattern}</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.springframework.test" level="warn"/>
+ <logger name="org.springframework" level="warn"/>
+ <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
+ <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>