diff options
author | Dan Timoney <dtimoney@att.com> | 2019-11-18 14:14:38 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-11-18 14:14:38 +0000 |
commit | 053895e37df903e11ef29b4e28572a31f896dba9 (patch) | |
tree | 51619901063f6ca988558bdeb5a95b87eb581245 /ms | |
parent | f69938ae11e0db78d41b8ea3397a51938161f022 (diff) | |
parent | ada372a56d25b41e5a0926a18bf911d20102810f (diff) |
Merge "Add message prioritization module"
Diffstat (limited to 'ms')
25 files changed, 1829 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt new file mode 100644 index 000000000..baf168767 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt @@ -0,0 +1,28 @@ + +To Delete Topics +------------------ +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic +kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic +kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog + +Create Topics +-------------- + +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic + +To List topics +---------------- +kafka-topics --list --bootstrap-server localhost:9092 + + +To Listen for Output +---------------------- +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning + +kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml new file mode 100644 index 000000000..ac46b3635 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Copyright © 2018-2019 AT&T Intellectual Property. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> + <artifactId>functions</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId> + <artifactId>message-prioritizaion</artifactId> + + <name>Blueprints Processor Function - Message Prioritization</name> + <description>Blueprints Processor Function - Message Prioritization</description> + + <dependencies> + <dependency> + <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> + <artifactId>message-lib</artifactId> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + </dependency> + </dependencies> +</project> diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt new file mode 100644 index 000000000..d89f71364 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt @@ -0,0 +1,41 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { + + private val log = logger(AbstractMessagePrioritizeProcessor::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + + override fun init(context: ProcessorContext) { + this.processorContext = context + /** Get the State service to update in store */ + this.messagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt new file mode 100644 index 000000000..cce883c91 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@ComponentScan +open class MessagePrioritizationConfiguration + + +object MessagePrioritizationConstants { + + const val SOURCE_INPUT = "source-prioritization-input" + + const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" + const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate" + const val PROCESSOR_OUTPUT = "processor-prioritization-output" + + const val SINK_OUTPUT = "sink-prioritization-output" + const val SINK_EXPIRED = "sink-prioritization-expired" +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt new file mode 100644 index 000000000..ef9d5a058 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -0,0 +1,101 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +open class MessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) { + + private val log = logger(MessagePrioritizationConsumer::class) + + lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration) + : KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>?): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + prioritizationConfiguration), + MessagePrioritizationConstants.SOURCE_INPUT) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + prioritizationConfiguration), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + + topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT, + bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT, + prioritizationConfiguration), + MessagePrioritizationConstants.PROCESSOR_AGGREGATE) + + topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED, + prioritizationConfiguration.expiredTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) + + topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, + prioritizationConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_OUTPUT) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (streamingConsumerService != null) { + streamingConsumerService.shutDown() + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt new file mode 100644 index 000000000..d874cef92 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -0,0 +1,69 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import java.io.Serializable + +object MessageActionConstants { + const val PRIORITIZE = "prioritize" +} + +enum class MessageState(val id: String) { + NEW("new"), + WAIT("wait"), + EXPIRED("expired"), + PRIORITIZED("prioritized"), + AGGREGATED("aggregated"), + IGNORED("ignored"), + COMPLETED("completed"), +} + +open class PrioritizationConfiguration : Serializable { + lateinit var expiryConfiguration: ExpiryConfiguration + lateinit var shutDownConfiguration: ShutDownConfiguration + lateinit var cleanConfiguration: CleanConfiguration + lateinit var inputTopicSelector: String // Consumer Configuration Selector + lateinit var expiredTopic: String // Publish Configuration Selector + lateinit var outputTopic: String // Publish Configuration Selector +} + +open class ExpiryConfiguration : Serializable { + var frequencyMilli: Long = 30000L + var maxPollRecord: Int = 1000 +} + +open class ShutDownConfiguration : Serializable { + var waitMill: Long = 30000L +} + +open class CleanConfiguration : Serializable { + var frequencyMilli: Long = 30000L + var expiredRecordsHoldDays: Int = 5 +} + +open class UpdateStateRequest : Serializable { + lateinit var id: String + var group: String? = null + var state: String? = null + var notifyMessage: String? = null +} + +data class CorrelationCheckResponse(var message: String? = null, + var correlated: Boolean = false) + +data class TypeCorrelationKey(val type: String, val correlationId: String) + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt new file mode 100644 index 000000000..94fedf4df --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -0,0 +1,44 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + + +fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) + : ProcessorSupplier<K, V> { + return ProcessorSupplier<K, V> { + // Dynamically resolve the Prioritization Processor + val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) + processorInstance.prioritizationConfiguration = prioritizationConfiguration + processorInstance + } +} + +fun MessagePrioritization.toFormatedCorrelation(): String { + val ascendingKey = this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") + return ascendingKey +} + +fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { + val ascendingKey = this.correlationId!!.split(",") + .map { it.trim() }.sorted().joinToString(",") + return TypeCorrelationKey(this.type, ascendingKey) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt new file mode 100644 index 000000000..307d932a9 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.* + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc") + fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc") + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable) + : List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List<String>, expiryCheckDate: Date, + count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByStateInAndExpiredDate(states: List<String>, expiryCheckDate: Date, + count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByGroupAndStateInAndExpiredDate(group: String, states: List<String>, expiryCheckDate: Date, + count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") + fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") + fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String) + : List<MessagePrioritization>? + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") + fun findByGroupAndTypesAndCorrelationId(group: String, states: List<String>, types: List<String>, + correlationId: String): List<MessagePrioritization>? + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id") + fun setStatusForMessageId(id: String, state: String): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids") + fun setStatusForMessageIds(ids: List<String>, state: String): Int + + @Modifying + @Transactional + @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " + + "WHERE pm.id = :id") + fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") + fun deleteGroupAndStateIn(group: String, states: List<String>) +} + diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt new file mode 100644 index 000000000..4973cdf6e --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt @@ -0,0 +1,74 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import com.fasterxml.jackson.annotation.JsonFormat +import org.hibernate.annotations.Proxy +import org.springframework.data.annotation.LastModifiedDate +import org.springframework.data.jpa.domain.support.AuditingEntityListener +import org.springframework.data.jpa.repository.config.EnableJpaAuditing +import java.util.* +import javax.persistence.* + +@EnableJpaAuditing +@EntityListeners(AuditingEntityListener::class) +@Entity +@Table(name = "MESSAGE_PRIORITIZATION") +@Proxy(lazy = false) +open class MessagePrioritization { + @Id + @Column(name = "message_id", length = 50) + lateinit var id: String + + @Column(name = "message_group", length = 50, nullable = false) + lateinit var group: String + + @Column(name = "message_type", length = 50, nullable = false) + lateinit var type: String + + /** States Defined by MessageState */ + @Column(name = "message_state", length = 20, nullable = false) + lateinit var state: String + + @Lob + @Column(name = "message", nullable = false) + var message: String? = null + + @Lob + @Column(name = "aggregated_message_ids", nullable = true) + var aggregatedMessageIds: String? = null + + @Lob + @Column(name = "correlation_id", nullable = true) + var correlationId: String? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "created_date", nullable = false) + var createdDate = Date() + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @LastModifiedDate + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updated_date", nullable = false) + var updatedDate: Date? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "expiry_date", nullable = false) + var expiryDate: Date? = null +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..e4369fc20 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -0,0 +1,172 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.data.domain.PageRequest +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.* + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>? + + suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? + + suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>? + + suspend fun updateMessagesState(ids: List<String>, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesState(ids: List<String>, state: String) + + suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List<String>) + + suspend fun deleteExpiredMessage(group: String, retentionDays: Int) +} + +@Service +open class MessagePrioritizationStateServiceImpl( + private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService { + + private val log = logger(MessagePrioritizationStateServiceImpl::class) + + @Transactional + override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { + if (!message.correlationId.isNullOrBlank()) { + message.correlationId = message.toFormatedCorrelation() + } + message.updatedDate = Date() + return prioritizationMessageRepository.save(message) + } + + override suspend fun getMessage(id: String): MessagePrioritization { + return prioritizationMessageRepository.findById(id).orElseGet(null) + ?: throw BluePrintProcessorException("couldn't find message for id($id)") + } + + override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? { + return prioritizationMessageRepository + .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count)) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int) + : List<MessagePrioritization>? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group, + states, Date(), PageRequest.of(0, count)) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int) + : List<MessagePrioritization>? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group, + states, Date(), PageRequest.of(0, count)) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int) + : List<MessagePrioritization>? { + return prioritizationMessageRepository.findByByGroupAndExpiredDate(group, + expiryDate, PageRequest.of(0, count)) + } + + override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, + correlationIds: String): List<MessagePrioritization>? { + return if (!types.isNullOrEmpty()) { + prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) + } else { + prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) + } + } + + override suspend fun updateMessagesState(ids: List<String>, state: String) { + ids.forEach { + val updated = updateMessageState(it, state) + log.info("message($it) update to state(${updated.state})") + } + } + + @Transactional + override suspend fun setMessageState(id: String, state: String) { + prioritizationMessageRepository.setStatusForMessageId(id, state) + } + + @Transactional + override suspend fun setMessagesState(ids: List<String>, state: String) { + prioritizationMessageRepository.setStatusForMessageIds(ids, state) + } + + @Transactional + override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + } + return saveMessage(updateMessage) + } + + override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>) + : MessagePrioritization { + + val groupedIds = groupedMessageIds.joinToString(",") + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + this.aggregatedMessageIds = groupedIds + } + return saveMessage(updateMessage) + } + + override suspend fun deleteMessage(id: String) { + return prioritizationMessageRepository.deleteById(id) + } + + override suspend fun deleteMessageByGroup(group: String) { + return prioritizationMessageRepository.deleteGroup(group) + } + + override suspend fun deleteMessageStates(group: String, states: List<String>) { + return prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + } + + override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { + return prioritizationMessageRepository.deleteGroupAndStateIn(group, + arrayListOf(MessageState.EXPIRED.name)) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt new file mode 100644 index 000000000..8dd4019dd --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -0,0 +1,54 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() { + + private val log = logger(MessageAggregateProcessor::class) + + override suspend fun processNB(key: String, value: String) { + + log.info("@@@@@ received in aggregation processor key($key), value($value)") + val ids = value.split(",").map { it.trim() } + if (!ids.isNullOrEmpty()) { + if (ids.size == 1) { + processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) + } else { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(ids) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + } + } + } + + /** Child will override this implementation , if necessary */ + open suspend fun handleAggregation(messageIds: List<String>) { + log.info("messages($messageIds) aggregated") + messageIds.forEach { id -> + processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt new file mode 100644 index 000000000..34faa1b3b --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt @@ -0,0 +1,35 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() { + + private val log = logger(MessageOutputProcessor::class) + + override suspend fun processNB(key: String, value: String) { + log.info("$$$$$ received in output processor key($key), value($value)") + val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name) + processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt new file mode 100644 index 000000000..a745e034c --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) + : AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityExpiryPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + + log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + val expiryConfiguration = configuration.expiryConfiguration + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + + val expiredIds = fetchMessages?.map { it.id } + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expired -> + processorContext.forward(expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_EXPIRED)) + } + } + } +} + +class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) + : AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityCleanPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + //TODO + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt new file mode 100644 index 000000000..00d454727 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde<MessagePrioritization> { + + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer<MessagePrioritization> { + return object : Deserializer<MessagePrioritization> { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer<MessagePrioritization> { + return object : Serializer<MessagePrioritization> { + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt new file mode 100644 index 000000000..5a5aa2575 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -0,0 +1,138 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.streams.processor.Cancellable +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.time.Duration +import java.util.* + + +open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { + + private val log = logger(MessagePrioritizeProcessor::class) + + lateinit var expiryCancellable: Cancellable + lateinit var cleanCancellable: Cancellable + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + log.info("***** received in prioritize processor key(${String(key)})") + val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + // Save the Message + messagePrioritizationStateService.saveMessage(data) + handleCorrelationAndNextStep(data) + + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** set up expiry marking cron */ + initializeExpiryPunctuator() + /** Set up cleaning records cron */ + initializeCleanPunctuator() + } + + override fun close() { + log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})") + expiryCancellable.cancel() + cleanCancellable.cancel() + } + + open fun initializeExpiryPunctuator() { + val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService) + expiryPunctuator.processorContext = processorContext + expiryPunctuator.configuration = prioritizationConfiguration + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli), + PunctuationType.WALL_CLOCK_TIME, expiryPunctuator) + log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") + } + + open fun initializeCleanPunctuator() { + val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService) + cleanPunctuator.processorContext = processorContext + cleanPunctuator.configuration = prioritizationConfiguration + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), + PunctuationType.WALL_CLOCK_TIME, cleanPunctuator) + log.info("Clean punctuator setup complete with expiry " + + "hold(${cleanConfiguration.expiredRecordsHoldDays})days") + } + + open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { + /** Check correlation enabled and correlation field has populated */ + if (!messagePrioritization.correlationId.isNullOrBlank()) { + val id = messagePrioritization.id + val group = messagePrioritization.group + val correlationId = messagePrioritization.correlationId!! + val types = getGroupCorrelationTypes(messagePrioritization) + log.info("checking correlation for message($id), group($group), types($types), " + + "correlation id($correlationId)") + + /** Get all previously received messages from database for group and optional types and correlation Id */ + val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId) + + /** If multiple records found, then check correlation */ + if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { + /** Check all correlation satisfies */ + val correlationResults = MessageCorrelationUtils + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + + if (correlationResults.correlated) { + /** Correlation satisfied */ + val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") + /** Send only correlated ids to next processor */ + this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + } else { + /** Correlation not satisfied */ + log.trace("correlation not matched : ${correlationResults.message}") + val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id } + // Update the Message state to Wait + messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name) + } + } else { + /** received first message of group and correlation Id, update the message with wait state */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) + } + } else { + // No Correlation check needed, simply forward to next processor. + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) + this.processorContext.forward(messagePrioritization.id, messagePrioritization.id, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + } + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + return null + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt new file mode 100644 index 000000000..cc30af2f1 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -0,0 +1,82 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +object MessageCorrelationUtils { + + /** Assumption is message is of same group **/ + fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse { + val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") + if (collectedMessages.size > 1) { + val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } + if (filteredMessage.isNotEmpty()) { + val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } + if (groupedMessage.size == 1) { + correlationCheckResponse.correlated = true + correlationCheckResponse.message = null + } + } + } else { + correlationCheckResponse.message = "received only one message for that group" + } + return correlationCheckResponse + } + + /** Assumption is message is of same group and checking for required types **/ + fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?) + : CorrelationCheckResponse { + + return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { + + val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } + if (!unknownMessageTypes.isNullOrEmpty()) { + throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") + } + + val copyTypes = types.toTypedArray().copyOf().toMutableList() + + val filteredMessage = collectedMessages.filter { + !it.correlationId.isNullOrBlank() + && types.contains(it.type) + } + var correlatedKeys: MutableSet<String> = mutableSetOf() + if (filteredMessage.isNotEmpty()) { + val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } + val foundType = correlatedMap.keys.map { it.type } + copyTypes.removeAll(foundType) + correlatedKeys = correlatedMap.keys.map { + it.correlationId + }.toMutableSet() + } + /** Check if any Types missing and same correlation id for all types */ + return if (copyTypes.isEmpty()) { + if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) + else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") + } else { + CorrelationCheckResponse(message = "couldn't find types($copyTypes)") + } + } else { + return correlatedMessages(collectedMessages) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt new file mode 100644 index 000000000..3281a97f9 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -0,0 +1,103 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.* + +object MessagePrioritizationSample { + + fun samplePrioritizationConfiguration(): PrioritizationConfiguration { + return PrioritizationConfiguration().apply { + inputTopicSelector = "prioritize-input" + outputTopic = "prioritize-output-topic" + expiredTopic = "prioritize-expired-topic" + expiryConfiguration = ExpiryConfiguration().apply { + frequencyMilli = 10000L + maxPollRecord = 2000 + } + shutDownConfiguration = ShutDownConfiguration().apply { + waitMill = 2000L + } + cleanConfiguration = CleanConfiguration().apply { + frequencyMilli = 10000L + expiredRecordsHoldDays = 5 + } + } + } + + private fun currentDatePlusDays(days: Int): Date { + val calender = Calendar.getInstance() + calender.add(Calendar.DATE, days) + return calender.time + } + + fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> { + return sampleMessages("sample-group", messageState, count) + } + + fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { + val messages: MutableList<MessagePrioritization> = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, + "sample-type", null) + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { + val messages: MutableList<MessagePrioritization> = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, "sample-type", + "key1=value1,key2=value2") + messages.add(backPressureMessage) + } + return messages + } + + fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String, + count: Int): List<MessagePrioritization> { + val messages: MutableList<MessagePrioritization> = arrayListOf() + repeat(count) { + val backPressureMessage = createMessage(groupName, messageState, "type-$it", + "key1=value1,key2=value2") + messages.add(backPressureMessage) + } + return messages + } + + fun createMessage(groupName: String, messageState: String, messageType: String, + messageCorrelationId: String?): MessagePrioritization { + + return MessagePrioritization().apply { + id = UUID.randomUUID().toString() + group = groupName + type = messageType + state = messageState + correlationId = messageCorrelationId + message = "I am the Message" + createdDate = Date() + updatedDate = Date() + expiryDate = currentDatePlusDays(3) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt new file mode 100644 index 000000000..bd99f72d0 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -0,0 +1,175 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import io.mockk.coEvery +import io.mockk.every +import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.context.ApplicationContext +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.Test +import kotlin.test.assertNotNull + + +@RunWith(SpringRunner::class) +@DataJpaTest +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]) +@TestPropertySource(properties = +[ + "spring.jpa.show-sql=true", + "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" +]) +open class MessagePrioritizationConsumerTest { + + @Autowired + lateinit var applicationContext: ApplicationContext + + @Autowired + lateinit var prioritizationMessageRepository: PrioritizationMessageRepository + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Before + fun setup() { + BluePrintDependencyService.inject(applicationContext) + } + + @Test + fun testBluePrintKafkaJDBCKeyStore() { + runBlocking { + assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") + + val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService + .instance(MessagePrioritizationStateService::class) + assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") + + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { + val message = messagePrioritizationService.saveMessage(it) + val repoResult = messagePrioritizationService.getMessage(message.id) + assertNotNull(repoResult, "failed to get inserted message.") + } + } + } + + @Test + fun testStartConsuming() { + runBlocking { + val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() + + val streamingConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(configuration.inputTopicSelector) + assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") + + val spyStreamingConsumerService = spyk(streamingConsumerService) + coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit + coEvery { spyStreamingConsumerService.shutDown() } returns Unit + val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) + + + // Test Topology + val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) + val messageConsumerProperties = bluePrintMessageLibPropertyService + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) + assertNotNull(topology, "failed to get create topology") + + every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService + spyMessagePrioritizationConsumer.startConsuming(configuration) + spyMessagePrioritizationConsumer.shutDown() + } + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testMessagePrioritizationConsumer() { + runBlocking { + val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) + + /** Send sample message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService + launch { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + delay(100) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), + headers = headers) + } + } + delay(10000) + messagePrioritizationConsumer.shutDown() + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt new file mode 100644 index 000000000..4e3eb191b --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -0,0 +1,57 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.PrimaryDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Service +import javax.sql.DataSource + +@Configuration +@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) +@EnableAutoConfiguration +open class TestDatabaseConfiguration { + + @Bean("primaryDBLibGenericService") + open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { + return PrimaryDBLibGenericService(NamedParameterJdbcTemplate(dataSource)) + } +} + +@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) +open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + return when (messagePrioritization.group) { + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + else -> null + } + } +} + +@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) +open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() + +@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) +open class DefaultMessageOutputProcessor : MessageOutputProcessor()
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt new file mode 100644 index 000000000..b470db909 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils + +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import kotlin.test.assertTrue + +class MessageCorrelationUtilsTest { + + @Test + fun testCorrelationKeysReordered() { + + val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2") + val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1") + + val multipleMessages: MutableList<MessagePrioritization> = arrayListOf() + multipleMessages.add(message1) + multipleMessages.add(message2) + val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) + assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") + } + + @Test + fun differentTypesWithSameCorrelationMessages() { + /** With Types **/ + /* Assumption is Same group with different types */ + val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2")) + assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse") + + /* Assumption is Same group with different types and one missing expected types, + In this case type-3 message is missing */ + val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3")) + assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType") + } + + @Test + fun withSameCorrelationMessagesWithIgnoredTypes() { + /** With ignoring Types */ + /** Assumption is only one message received */ + val withSameCorrelationOneMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationOneMessages, null) + assertTrue(!withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp") + + /** Assumption is two message received for same group with same correlation */ + val withSameCorrelationMessages = MessagePrioritizationSample + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + withSameCorrelationMessages, null) + assertTrue(withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp") + } + + @Test + fun differentTypesWithDifferentCorrelationMessage() { + /** Assumption is two message received for same group with different expected types and different correlation */ + val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2") + val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3") + val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf() + differentTypesWithDifferentCorrelationMessage.add(message1) + differentTypesWithDifferentCorrelationMessage.add(message2) + val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1")) + assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp") + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml new file mode 100644 index 000000000..e3a1f7a01 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml @@ -0,0 +1,42 @@ +<!-- + ~ Copyright © 2018-2019 AT&T Intellectual Property. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <property name="testing" + value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>${localPattern}</pattern> + </encoder> + </appender> + + <logger name="org.springframework.test" level="warn"/> + <logger name="org.springframework" level="warn"/> + <logger name="org.hibernate.type.descriptor.sql" level="warn"/> + <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> + + <root level="warn"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> diff --git a/ms/blueprintsprocessor/functions/pom.xml b/ms/blueprintsprocessor/functions/pom.xml index 3ee6737ef..38f9071ee 100755 --- a/ms/blueprintsprocessor/functions/pom.xml +++ b/ms/blueprintsprocessor/functions/pom.xml @@ -39,6 +39,7 @@ <module>restconf-executor</module> <module>cli-executor</module> <module>config-snapshots</module> + <module>message-prioritizaion</module> </modules> <dependencies> diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt new file mode 100644 index 000000000..4c6c0acdd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.Punctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** CDS Kafka Stream Processor abstract class to implement */ +abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> { + + private val log = logger(AbstractBluePrintMessageProcessor::class) + + lateinit var processorContext: ProcessorContext + + + override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) { + try { + processNB(key, value) + } catch (e: Exception) { + log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e) + } + } + + override fun init(context: ProcessorContext) { + log.info("initializing processor (${this.javaClass.simpleName})") + this.processorContext = context + + } + + override fun close() { + log.info("closing processor (${this.javaClass.simpleName})") + } + + abstract suspend fun processNB(key: K, value: V) +} + +/** CDS Kafka Stream Punctuator abstract class to implement */ +abstract class AbstractBluePrintMessagePunctuator : Punctuator { + lateinit var processorContext: ProcessorContext + + + override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) { + punctuateNB(timestamp) + } + + abstract suspend fun punctuateNB(timestamp: Long) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt new file mode 100644 index 000000000..86ccd74a2 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +/* +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.streams.state.StoreBuilder +import org.apache.kafka.streams.state.StoreSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import java.util.* + + +class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> { + + override fun get(): KafkaJDBCStore { + // Get the DBLibGenericService Instance + val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService() + return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService) + } + + override fun name(): String { + return name + } + + override fun metricsScope(): String { + return "jdbc-state" + } +} + +class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier) + : StoreBuilder<KafkaJDBCStore> { + + private var logConfig: MutableMap<String, String> = HashMap() + private var enableCaching: Boolean = false + private var enableLogging = true + + override fun logConfig(): MutableMap<String, String> { + return logConfig + } + + override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> { + enableCaching = false + return this + } + + override fun loggingEnabled(): Boolean { + return enableLogging + } + + override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> { + enableLogging = false + return this + } + + override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> { + enableCaching = true + return this + } + + override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> { + enableLogging = true + return this + } + + override fun name(): String { + return "KafkaJDBCKeyStoreBuilder" + } + + override fun build(): KafkaJDBCStore { + return storeSupplier.get() + } +} + +interface KafkaJDBCStore : StateStore { + + suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> + + suspend fun update(sql: String, params: Map<String, Any>): Int +} + + +class KafkaJDBCStoreImpl(private val name: String, + private val bluePrintDBLibGenericService: BluePrintDBLibGenericService) + : KafkaJDBCStore { + + private val log = logger(KafkaJDBCStoreImpl::class) + + override fun isOpen(): Boolean { + log.info("isOpen...") + return true + } + + override fun init(context: ProcessorContext, root: StateStore) { + log.info("init...") + } + + override fun flush() { + log.info("flush...") + } + + override fun close() { + log.info("Close...") + } + + override fun name(): String { + return name + } + + override fun persistent(): Boolean { + return true + } + + override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.query(sql, params) + } + + override suspend fun update(sql: String, params: Map<String, Any>): Int { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.update(sql, params) + } +} +*/ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt index 229e462da..d0297df4c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope val streamsConfig = streamsConfig(additionalConfig) val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") kafkaStreams = KafkaStreams(topology, streamsConfig) kafkaStreams.cleanUp() kafkaStreams.start() |