diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
28 files changed, 0 insertions, 2752 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md deleted file mode 100644 index cda43faca..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md +++ /dev/null @@ -1,26 +0,0 @@ - -To Delete Topics ------------------- -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog - -Create Topics --------------- - -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic - -To List topics ----------------- -kafka-topics --list --bootstrap-server localhost:9092 - -To publish message --------------------- -kafka-console-producer --broker-list localhost:9092 --topic prioritize-input-topic - -To Listen for Output ----------------------- -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml deleted file mode 100644 index e8467d02c..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Copyright © 2018-2019 AT&T Intellectual Property. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> - <artifactId>blueprintsprocessor-functions</artifactId> - <version>1.1.0-SNAPSHOT</version> - </parent> - - <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId> - <artifactId>message-prioritizaion</artifactId> - - <name>MS Blueprints Processor Functions - Message Prioritization</name> - <description>Blueprints Processor Function - Message Prioritization</description> - - <dependencies> - <dependency> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor.modules</groupId> - <artifactId>message-lib</artifactId> - </dependency> - <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> - </dependency> - </dependencies> -</project> diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt deleted file mode 100644 index 890e0a6ba..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration - -@Configuration -@ComponentScan -open class MessagePrioritizationConfiguration - -object MessagePrioritizationConstants { - - const val SOURCE_INPUT = "source-prioritization-input" - - const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" - - const val SINK_OUTPUT = "sink-prioritization-output" -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt deleted file mode 100644 index 65b7644a8..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import java.io.Serializable - -object MessageActionConstants { - - const val PRIORITIZE = "prioritize" -} - -enum class MessageState(val id: String) { - NEW("new"), - WAIT("wait"), - EXPIRED("expired"), - PRIORITIZED("prioritized"), - AGGREGATED("aggregated"), - COMPLETED("completed"), - ERROR("error") -} - -open class PrioritizationConfiguration : Serializable { - - lateinit var expiryConfiguration: ExpiryConfiguration - lateinit var shutDownConfiguration: ShutDownConfiguration - lateinit var cleanConfiguration: CleanConfiguration - var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration - var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration -} - -open class KafkaConfiguration : Serializable { - - lateinit var inputTopicSelector: String // Consumer Configuration Selector - lateinit var expiredTopic: String // Publish Configuration Selector - lateinit var outputTopic: String // Publish Configuration Selector -} - -open class NatsConfiguration : Serializable { - - lateinit var connectionSelector: String // Consumer Configuration Selector - lateinit var inputSubject: String // Publish Configuration Selector - lateinit var expiredSubject: String // Publish Configuration Selector - lateinit var outputSubject: String // Publish Configuration Selector -} - -open class ExpiryConfiguration : Serializable { - - var frequencyMilli: Long = 30000L - var maxPollRecord: Int = 1000 -} - -open class ShutDownConfiguration : Serializable { - - var waitMill: Long = 30000L -} - -open class CleanConfiguration : Serializable { - - var frequencyMilli: Long = 30000L - var expiredRecordsHoldDays: Int = 5 -} - -open class UpdateStateRequest : Serializable { - - lateinit var id: String - var group: String? = null - var state: String? = null -} - -data class CorrelationCheckResponse( - var message: String? = null, - var correlated: Boolean = false -) - -data class TypeCorrelationKey(val type: String, val correlationId: String) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt deleted file mode 100644 index dfe516953..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization - -interface MessagePrioritizationService { - - fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) - - fun getConfiguration(): PrioritizationConfiguration - - suspend fun prioritize(messagePrioritization: MessagePrioritization) - - /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */ - suspend fun output(messages: List<MessagePrioritization>) - - /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */ - suspend fun updateExpiredMessages() - - /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */ - suspend fun cleanExpiredMessage() -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt deleted file mode 100644 index 2e5e6c617..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.Date - -interface MessagePrioritizationStateService { - - suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization - - suspend fun getMessage(id: String): MessagePrioritization - - suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? - - suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? - - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? - - suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? - - suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? - - suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? - - suspend fun getCorrelatedMessages( - group: String, - states: List<String>, - types: List<String>?, - correlationIds: String - ): List<MessagePrioritization>? - - suspend fun updateMessagesState(ids: List<String>, state: String) - - suspend fun updateMessageState(id: String, state: String): MessagePrioritization - - suspend fun setMessageState(id: String, state: String) - - suspend fun setMessagesPriority(ids: List<String>, priority: String) - - suspend fun setMessagesState(ids: List<String>, state: String) - - suspend fun setMessageStateANdError(id: String, state: String, error: String) - - suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) - - suspend fun deleteMessage(id: String) - - suspend fun deleteMessages(id: List<String>) - - suspend fun deleteExpiredMessage(retentionDays: Int) - - suspend fun deleteMessageByGroup(group: String) - - suspend fun deleteMessageStates(group: String, states: List<String>) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt deleted file mode 100644 index d8e71d413..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -/** - * Register the MessagePrioritizationStateService and exposed dependency - */ -fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = - instance(MessagePrioritizationStateService::class) - -/** - * Expose messagePrioritizationStateService to AbstractComponentFunction - */ -fun AbstractComponentFunction.messagePrioritizationStateService() = - BluePrintDependencyService.messagePrioritizationStateService() - -/** - * MessagePrioritization correlation extensions - */ - -/** - * Arrange comma separated correlation keys in ascending order. - */ -fun MessagePrioritization.toFormatedCorrelation(): String { - return this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") -} - -/** - * Used to group the correlation with respect to types. - */ -fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { - return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) -} - -/** get list of message ids **/ -fun List<MessagePrioritization>.ids(): List<String> { - return this.map { it.id } -} - -/** Ordered by highest priority and updated date **/ -fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> { - return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate)) -} - -/** Ordered by Updated date **/ -fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> { - return this.sortedWith(compareBy(MessagePrioritization::updatedDate)) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt deleted file mode 100644 index c7aab03b6..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.mdcWebCoroutineScope -import org.springframework.http.MediaType -import org.springframework.web.bind.annotation.GetMapping -import org.springframework.web.bind.annotation.PathVariable -import org.springframework.web.bind.annotation.PostMapping -import org.springframework.web.bind.annotation.RequestBody -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.ResponseBody -import org.springframework.web.bind.annotation.RestController - -@RestController -@RequestMapping(value = ["/api/v1/message-prioritization"]) -open class MessagePrioritizationApi( - private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val messagePrioritizationService: MessagePrioritizationService -) { - - @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ResponseBody - suspend fun ping(): String = mdcWebCoroutineScope { "Success" } - - @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) - @ResponseBody - suspend fun messagePrioritization(@PathVariable(value = "id") id: String) = mdcWebCoroutineScope { - messagePrioritizationStateService.getMessage(id) - } - - @PostMapping( - path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - @ResponseBody - suspend fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = - mdcWebCoroutineScope { - messagePrioritizationStateService.saveMessage(messagePrioritization) - } - - @PostMapping( - path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - @ResponseBody - suspend fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = mdcWebCoroutineScope { - messagePrioritizationService.prioritize(messagePrioritization) - } - - @PostMapping( - path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE] - ) - suspend fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = - mdcWebCoroutineScope { - messagePrioritizationStateService.setMessageState( - updateMessageState.id, - updateMessageState.state!! - ) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt deleted file mode 100644 index ce2085f68..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import com.fasterxml.jackson.annotation.JsonFormat -import org.hibernate.annotations.Proxy -import org.springframework.data.annotation.LastModifiedDate -import org.springframework.data.jpa.domain.support.AuditingEntityListener -import org.springframework.data.jpa.repository.config.EnableJpaAuditing -import java.util.Date -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.EntityListeners -import javax.persistence.Id -import javax.persistence.Lob -import javax.persistence.Table -import javax.persistence.Temporal -import javax.persistence.TemporalType - -@EnableJpaAuditing -@EntityListeners(AuditingEntityListener::class) -@Entity -@Table(name = "MESSAGE_PRIORITIZATION") -@Proxy(lazy = false) -open class MessagePrioritization { - - @Id - @Column(name = "message_id", length = 50) - lateinit var id: String - - @Column(name = "message_group", length = 50, nullable = false) - lateinit var group: String - - @Column(name = "message_type", length = 50, nullable = false) - lateinit var type: String - - /** States Defined by MessageState */ - @Column(name = "message_state", length = 20, nullable = false) - lateinit var state: String - - @Column(name = "priority", nullable = false) - var priority: Int = 5 - - @Lob - @Column(name = "message", nullable = false) - var message: String? = null - - @Lob - @Column(name = "error", nullable = true) - var error: String? = null - - @Lob - @Column(name = "aggregated_message_ids", nullable = true) - var aggregatedMessageIds: String? = null - - @Lob - @Column(name = "correlation_id", nullable = true) - var correlationId: String? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "created_date", nullable = false) - var createdDate = Date() - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @LastModifiedDate - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "updated_date", nullable = false) - var updatedDate: Date? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "expiry_date", nullable = false) - var expiryDate: Date? = null -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt deleted file mode 100644 index 0b35e3856..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import org.springframework.data.domain.Pageable -import org.springframework.data.jpa.repository.JpaRepository -import org.springframework.data.jpa.repository.Modifying -import org.springframework.data.jpa.repository.Query -import org.springframework.stereotype.Repository -import org.springframework.transaction.annotation.Transactional -import java.util.Date - -@Repository -@Transactional(readOnly = true) -interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> { - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") - fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.updatedDate asc" - ) - fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable): - List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateInAndNotExpiredDate( - group: String, - states: List<String>, - expiryCheckDate: Date, - count: Pageable - ): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByStateInAndExpiredDate( - states: List<String>, - expiryCheckDate: Date, - count: Pageable - ): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndStateInAndExpiredDate( - group: String, - states: List<String>, - expiryCheckDate: Date, - count: Pageable - ): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" - ) - fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" - ) - fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String): - List<MessagePrioritization>? - - @Query( - "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" - ) - fun findByGroupAndTypesAndCorrelationId( - group: String, - states: List<String>, - types: List<String>, - correlationId: String - ): List<MessagePrioritization>? - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setStateForMessageId(id: String, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id IN :ids" - ) - fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id IN :ids" - ) - fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + - "WHERE id = :id" - ) - fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query( - "UPDATE MessagePrioritization SET state = :state, " + - "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" - ) - fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE id IN :ids") - fun deleteByIds(ids: List<String>) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ") - fun deleteByExpiryDate(expiryCheckDate: Date) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE group = :group") - fun deleteGroup(group: String) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states") - fun deleteGroupAndStateIn(group: String, states: List<String>) -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt deleted file mode 100644 index 112a80379..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -abstract class AbstractKafkaMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - private val log = logger(AbstractKafkaMessagePrioritizationService::class) - - lateinit var processorContext: ProcessorContext - - fun setKafkaProcessorContext(processorContext: ProcessorContext) { - this.processorContext = processorContext - } - - override suspend fun output(messages: List<MessagePrioritization>) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } - check(::processorContext.isInitialized) { "failed to initialize kafka processor " } - - messages.forEach { message -> - val updatedMessage = - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - processorContext.forward( - updatedMessage.id, - updatedMessage, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - - override suspend fun updateExpiredMessages() { - checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" } - check(::processorContext.isInitialized) { "failed to initialize kafka processor " } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (expiredIds != null && expiredIds.isNotEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expiredMessage -> - expiredMessage.state = MessageState.EXPIRED.name - processorContext.forward( - expiredMessage.id, expiredMessage, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt deleted file mode 100644 index d4f8470c8..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor - -/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ -abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { - - override fun init(processorContext: ProcessorContext) { - this.processorContext = processorContext - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt deleted file mode 100644 index 1b0612492..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils - -open class DefaultMessagePrioritizeProcessor( - private val messagePrioritizationStateService: MessagePrioritizationStateService, - private val kafkaMessagePrioritizationService: MessagePrioritizationService -) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { - - private val log = logger(DefaultMessagePrioritizeProcessor::class) - - override suspend fun processNB(key: ByteArray, value: ByteArray) { - - val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - try { - kafkaMessagePrioritizationService.prioritize(messagePrioritize) - } catch (e: Exception) { - messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" - log.error(messagePrioritize.error) - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!! - ) - /** Publish to Output topic */ - this.processorContext.forward( - messagePrioritize.id, messagePrioritize, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - - override fun init(context: ProcessorContext) { - super.init(context) - /** Set Configuration and Processor Context to messagePrioritizationService */ - if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) { - kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext) - } else { - throw BluePrintProcessorException( - "messagePrioritizationService is not instance of " + - "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}" - ) - } - } - - override fun close() { - log.info( - "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt deleted file mode 100644 index 4ab399f54..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList - -open class KafkaMessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, - private val kafkaMessagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(KafkaMessagePrioritizationConsumer::class) - - private lateinit var streamingConsumerService: BlueprintMessageConsumerService - - open fun consumerService(selector: String): BlueprintMessageConsumerService { - return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) - } - - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier<ByteArray, ByteArray>( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - kafkaConsumerConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - // Output will be sent to the group-output topic from Processor API - return topology - } - } - } - - suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - - val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration - ?: throw BluePrintProcessorException("failed to get kafka consumer configuration") - - streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector) - - // Dynamic Consumer Function to create Topology - val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) - streamingConsumerService.consume(null, consumerFunction) - } - - suspend fun shutDown() { - if (::streamingConsumerService.isInitialized) { - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt deleted file mode 100644 index 5595863d4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka - -import org.apache.kafka.common.serialization.Deserializer -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.common.serialization.Serializer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.nio.charset.Charset - -open class MessagePrioritizationSerde : Serde<MessagePrioritization> { - - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - } - - override fun close() { - } - - override fun deserializer(): Deserializer<MessagePrioritization> { - return object : Deserializer<MessagePrioritization> { - override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { - return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - } - - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - } - - override fun close() { - } - } - } - - override fun serializer(): Serializer<MessagePrioritization> { - return object : Serializer<MessagePrioritization> { - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - } - - override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { - return data.asJsonString().toByteArray(Charset.defaultCharset()) - } - - override fun close() { - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt deleted file mode 100644 index 502a7822d..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -abstract class AbstractNatsMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - private val log = logger(AbstractNatsMessagePrioritizationService::class) - - lateinit var bluePrintNatsService: BluePrintNatsService - - override suspend fun output(messages: List<MessagePrioritization>) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } - check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } - - val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject - messages.forEach { message -> - val updatedMessage = - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - - /** send to the output subject */ - bluePrintNatsService.publish( - NatsClusterUtils.currentApplicationSubject(outputSubject), - updatedMessage.asJsonType().asByteArray() - ) - } - } - - override suspend fun updateExpiredMessages() { - checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" } - check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (!expiredIds.isNullOrEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expiredMessage -> - expiredMessage.state = MessageState.EXPIRED.name - /** send to the output subject */ - bluePrintNatsService.publish( - NatsClusterUtils.currentApplicationSubject(outputSubject), - expiredMessage.asJsonType().asByteArray() - ) - } - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt deleted file mode 100644 index a0b2cf462..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats - -import io.nats.streaming.MessageHandler -import io.nats.streaming.Subscription -import kotlinx.coroutines.runBlocking -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asType -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils - -open class NatsMessagePrioritizationConsumer( - private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService, - private val natsMessagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(NatsMessagePrioritizationConsumer::class) - - lateinit var bluePrintNatsService: BluePrintNatsService - private lateinit var subscription: Subscription - - suspend fun startConsuming() { - val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration() - val natsConfiguration = prioritizationConfiguration.natsConfiguration - ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration") - - check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) { - "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService." - } - bluePrintNatsService = consumerService(natsConfiguration.connectionSelector) - natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService - val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject) - val loadBalanceGroup = ClusterUtils.applicationName() - val messageHandler = createMessageHandler() - val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject)) - subscription = bluePrintNatsService.loadBalanceSubscribe( - inputSubject, - loadBalanceGroup, - messageHandler, - subscriptionOptions - ) - log.info( - "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)." - ) - } - - suspend fun shutDown() { - if (::subscription.isInitialized) { - subscription.unsubscribe() - } - log.info("Nats prioritization consumer listener shutdown complete") - } - - private fun consumerService(selector: String): BluePrintNatsService { - return bluePrintNatsLibPropertyService.bluePrintNatsService(selector) - } - - private fun createMessageHandler(): MessageHandler { - return MessageHandler { message -> - try { - val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java) - runBlocking { - natsMessagePrioritizationService.prioritize(messagePrioritization) - } - } catch (e: Exception) { - log.error("failed to process prioritize message", e) - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt deleted file mode 100644 index f4602a810..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ -abstract class AbstractMessagePrioritizationService( - private val messagePrioritizationStateService: MessagePrioritizationStateService -) : MessagePrioritizationService { - - private val log = logger(AbstractMessagePrioritizationService::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - - override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) { - this.prioritizationConfiguration = prioritizationConfiguration - } - - override fun getConfiguration(): PrioritizationConfiguration { - return this.prioritizationConfiguration - } - - override suspend fun prioritize(messagePrioritize: MessagePrioritization) { - try { - log.info("***** received in prioritize processor key(${messagePrioritize.id})") - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - /** Get the cluster lock for message group */ - val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) - // Save the Message - messagePrioritizationStateService.saveMessage(messagePrioritize) - handleCorrelationAndNextStep(messagePrioritize) - /** Cluster unLock for message group */ - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } catch (e: Exception) { - messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" - log.error(messagePrioritize.error) - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!! - ) - } - } - - override suspend fun output(messages: List<MessagePrioritization>) { - log.info("$$$$$ received in output processor id(${messages.ids()})") - messages.forEach { message -> - messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name) - } - } - - override suspend fun updateExpiredMessages() { - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - val clusterLock = MessageProcessorUtils.prioritizationExpiryLock() - try { - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - val expiredIds = fetchMessages?.ids() - if (!expiredIds.isNullOrEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - } - } catch (e: Exception) { - log.error("failed in updating expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } - - override suspend fun cleanExpiredMessage() { - check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " } - - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - val clusterLock = MessageProcessorUtils.prioritizationCleanLock() - try { - messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays) - } catch (e: Exception) { - log.error("failed in clean expired messages", e) - } finally { - MessageProcessorUtils.prioritizationUnLock(clusterLock) - } - } - - open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { - /** Check correlation enabled and correlation field has populated */ - if (!messagePrioritization.correlationId.isNullOrBlank()) { - val id = messagePrioritization.id - val group = messagePrioritization.group - val correlationId = messagePrioritization.correlationId!! - val types = getGroupCorrelationTypes(messagePrioritization) - log.info( - "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " + - "correlation types($types), priority(${messagePrioritization.priority}), " + - "correlation id($correlationId)" - ) - - /** Get all previously received messages from database for group and optional types and correlation Id */ - val waitingCorrelatedStoreMessages = messagePrioritizationStateService - .getCorrelatedMessages( - group, - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId - ) - - /** If multiple records found, then check correlation */ - if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { - /** Check all correlation satisfies */ - val correlationResults = MessageCorrelationUtils - .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) - - if (correlationResults.correlated) { - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState( - waitingCorrelatedStoreMessages.ids(), - MessageState.PRIORITIZED.name - ) - /** Correlation satisfied, Send only correlated messages to aggregate processor */ - aggregate(waitingCorrelatedStoreMessages) - } else { - /** Correlation not satisfied */ - log.trace("correlation not matched : ${correlationResults.message}") - // Update the Message state to Wait - messagePrioritizationStateService.setMessagesState( - waitingCorrelatedStoreMessages.ids(), - MessageState.WAIT.name - ) - } - } else { - /** received first message of group and correlation Id, update the message with wait state */ - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) - } - } else { - /** No Correlation check needed, simply forward to next processor. */ - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - aggregate(arrayListOf(messagePrioritization)) - } - } - - open suspend fun aggregate(messages: List<MessagePrioritization>) { - log.info("@@@@@ received in aggregation processor ids(${messages.ids()}") - if (!messages.isNullOrEmpty()) { - try { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(messages) - } catch (e: Exception) { - val error = "failed in aggregate message(${messages.ids()}) : ${e.message}" - if (!messages.isNullOrEmpty()) { - messages.forEach { messagePrioritization -> - try { - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritization.id, - MessageState.ERROR.name, error - ) - } catch (sendException: Exception) { - log.error( - "failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", - e - ) - } - } - /** Publish to output topic */ - output(messages) - } - } - } - } - - /** Child will override this implementation , if necessary - * Here the place child has to implement custom Sequencing and Aggregation logic. - * */ - abstract suspend fun handleAggregation(messages: List<MessagePrioritization>) - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt deleted file mode 100644 index 529d773a4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.springframework.stereotype.Service - -@Service -open class MessagePrioritizationSchedulerService( - private val messagePrioritizationService: MessagePrioritizationService -) { - - private val log = logger(MessagePrioritizationSchedulerService::class) - - @Volatile - var keepGoing = true - - /** This is sample scheduler implementation used during starting application with configuration. - @EventListener(ApplicationReadyEvent::class) - open fun init() = runBlocking { - log.info("Starting PrioritizationListeners...") - startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration()) - } - */ - - open suspend fun startScheduling() { - val prioritizationConfiguration = messagePrioritizationService.getConfiguration() - - log.info("Starting Prioritization Scheduler Service...") - GlobalScope.launch { - expiryScheduler(prioritizationConfiguration) - } - GlobalScope.launch { - cleanUpScheduler(prioritizationConfiguration) - } - } - - open suspend fun shutdownScheduling() { - keepGoing = false - val prioritizationConfiguration = messagePrioritizationService.getConfiguration() - delay(prioritizationConfiguration.shutDownConfiguration.waitMill) - } - - private suspend fun expiryScheduler( - prioritizationConfiguration: PrioritizationConfiguration - ) { - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec") - withContext(Dispatchers.Default) { - while (keepGoing) { - try { - messagePrioritizationService.updateExpiredMessages() - delay(expiryConfiguration.frequencyMilli) - } catch (e: Exception) { - log.error("failed in prioritization expiry scheduler", e) - } - } - } - } - - private suspend fun cleanUpScheduler( - prioritizationConfiguration: PrioritizationConfiguration - ) { - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec") - withContext(Dispatchers.Default) { - while (keepGoing) { - try { - messagePrioritizationService.cleanExpiredMessage() - delay(cleanConfiguration.frequencyMilli) - } catch (e: Exception) { - log.error("failed in prioritization clean scheduler", e) - } - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt deleted file mode 100644 index ed16fd44f..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate -import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate -import org.springframework.data.domain.PageRequest -import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import java.util.Date - -@Service -open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository -) : MessagePrioritizationStateService { - - private val log = logger(MessagePrioritizationStateServiceImpl::class) - - @Transactional - override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { - if (!message.correlationId.isNullOrBlank()) { - message.correlationId = message.toFormatedCorrelation() - } - message.updatedDate = Date() - return prioritizationMessageRepository.save(message) - } - - override suspend fun getMessage(id: String): MessagePrioritization { - return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BluePrintProcessorException("couldn't find message for id($id)") - } - - override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? { - return prioritizationMessageRepository.findAllById(ids) - } - - override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository - .findByStateInAndExpiredDate( - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), - Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? { - return prioritizationMessageRepository.findByExpiredDate( - expiryDate, PageRequest.of(0, count) - ) - } - - override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): - List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndExpiredDate( - group, - expiryDate, PageRequest.of(0, count) - ) - } - - override suspend fun getCorrelatedMessages( - group: String, - states: List<String>, - types: List<String>?, - correlationIds: String - ): List<MessagePrioritization>? { - return if (!types.isNullOrEmpty()) { - prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) - } else { - prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) - } - } - - @Transactional - override suspend fun updateMessagesState(ids: List<String>, state: String) { - ids.forEach { - val updated = updateMessageState(it, state) - log.info("message($it) update to state(${updated.state})") - } - } - - @Transactional - override suspend fun setMessageState(id: String, state: String) { - prioritizationMessageRepository.setStateForMessageId(id, state, Date()) - } - - @Transactional - override suspend fun setMessagesPriority(ids: List<String>, priority: String) { - prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) - } - - @Transactional - override suspend fun setMessagesState(ids: List<String>, state: String) { - prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) - } - - @Transactional - override suspend fun setMessageStateANdError(id: String, state: String, error: String) { - prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) - } - - @Transactional - override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { - val updateMessage = getMessage(id).apply { - this.updatedDate = Date() - this.state = state - } - return saveMessage(updateMessage) - } - - @Transactional - override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) { - val groupedIds = aggregatedIds.joinToString(",") - prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) - } - - override suspend fun deleteMessage(id: String) { - prioritizationMessageRepository.deleteById(id) - log.info("Prioritization Messages $id deleted successfully.") - } - - override suspend fun deleteMessages(ids: List<String>) { - prioritizationMessageRepository.deleteByIds(ids) - log.info("Prioritization Messages $ids deleted successfully.") - } - - override suspend fun deleteExpiredMessage(retentionDays: Int) { - val expiryCheckDate = controllerDate().addDate(retentionDays) - prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate) - } - - override suspend fun deleteMessageByGroup(group: String) { - prioritizationMessageRepository.deleteGroup(group) - log.info("Prioritization Messages group($group) deleted successfully.") - } - - override suspend fun deleteMessageStates(group: String, states: List<String>) { - prioritizationMessageRepository.deleteGroupAndStateIn(group, states) - log.info("Prioritization Messages group($group) with states($states) deleted successfully.") - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt deleted file mode 100644 index 305e64ba4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -/** Sample Prioritization Service, Define spring service injector to register in application*/ -open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List<MessagePrioritization>) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List<MessagePrioritization>) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) { - - /** Child overriding this implementation , if necessary */ - override suspend fun handleAggregation(messages: List<MessagePrioritization>) { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - sampleMessagePrioritizationHandler.handleAggregation(messages) - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler( - this, messagePrioritizationStateService - ) - return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization) - } -} - -class SampleMessagePrioritizationHandler( - private val messagePrioritizationService: MessagePrioritizationService, - private val messagePrioritizationStateService: MessagePrioritizationStateService -) { - - private val log = logger(SampleMessagePrioritizationHandler::class) - - suspend fun handleAggregation(messages: List<MessagePrioritization>) { - log.info("messages(${messages.ids()}) aggregated") - /** Sequence based on Priority and Updated Date */ - val sequencedMessage = messages.orderByHighestPriority() - /** Update all messages to aggregated state */ - messagePrioritizationStateService.setMessagesState( - sequencedMessage.ids(), - MessageState.AGGREGATED.name - ) - messagePrioritizationService.output(sequencedMessage) - } - - fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - return when (messagePrioritization.group) { - /** Dummy Implementation, This can also be read from file and stored as cached map **/ - "group-typed" -> arrayListOf("type-0", "type-1", "type-2") - "pass-typed" -> arrayListOf(messagePrioritization.type) - else -> null - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt deleted file mode 100644 index 7ab0be098..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException - -object MessageCorrelationUtils { - - /** Assumption is message is of same group **/ - fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse { - val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated") - if (collectedMessages.size > 1) { - val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() } - if (filteredMessage.isNotEmpty()) { - val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() } - if (groupedMessage.size == 1) { - correlationCheckResponse.correlated = true - correlationCheckResponse.message = null - } - } - } else { - correlationCheckResponse.message = "received only one message for that group" - } - return correlationCheckResponse - } - - /** Assumption is message is of same group and checking for required types **/ - fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?): - CorrelationCheckResponse { - - return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { - - val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id } - if (!unknownMessageTypes.isNullOrEmpty()) { - throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)") - } - - val copyTypes = types.toTypedArray().copyOf().toMutableList() - - val filteredMessage = collectedMessages.filter { - !it.correlationId.isNullOrBlank() && - types.contains(it.type) - } - var correlatedKeys: MutableSet<String> = mutableSetOf() - if (filteredMessage.isNotEmpty()) { - val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() } - val foundType = correlatedMap.keys.map { it.type } - copyTypes.removeAll(foundType) - correlatedKeys = correlatedMap.keys.map { - it.correlationId - }.toMutableSet() - } - /** Check if any Types missing and same correlation id for all types */ - return if (copyTypes.isEmpty()) { - if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true) - else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)") - } else { - CorrelationCheckResponse(message = "couldn't find types($copyTypes)") - } - } else { - return correlatedMessages(collectedMessages) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt deleted file mode 100644 index 2c4ae30da..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate -import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate -import java.util.Date -import java.util.UUID - -object MessagePrioritizationSample { - - fun samplePrioritizationConfiguration(): PrioritizationConfiguration { - return PrioritizationConfiguration().apply { - kafkaConfiguration = KafkaConfiguration().apply { - inputTopicSelector = "prioritize-input" - outputTopic = "prioritize-output-topic" - expiredTopic = "prioritize-expired-topic" - } - natsConfiguration = NatsConfiguration().apply { - connectionSelector = "cds-controller" - inputSubject = "prioritize-input" - outputSubject = "prioritize-output" - expiredSubject = "prioritize-expired" - } - expiryConfiguration = ExpiryConfiguration().apply { - frequencyMilli = 10000L - maxPollRecord = 2000 - } - shutDownConfiguration = ShutDownConfiguration().apply { - waitMill = 2000L - } - cleanConfiguration = CleanConfiguration().apply { - frequencyMilli = 10000L - expiredRecordsHoldDays = 5 - } - } - } - - fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration { - return PrioritizationConfiguration().apply { - expiryConfiguration = ExpiryConfiguration().apply { - frequencyMilli = 10L - maxPollRecord = 2000 - } - shutDownConfiguration = ShutDownConfiguration().apply { - waitMill = 20L - } - cleanConfiguration = CleanConfiguration().apply { - frequencyMilli = 10L - expiredRecordsHoldDays = 5 - } - } - } - - private fun currentDatePlusDays(days: Int): Date { - return controllerDate().addDate(days) - } - - fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> { - return sampleMessages("sample-group", messageState, count) - } - - fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { - val messages: MutableList<MessagePrioritization> = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, - "sample-type", null - ) - messages.add(backPressureMessage) - } - return messages - } - - fun sampleMessageWithSameCorrelation( - groupName: String, - messageState: String, - count: Int - ): List<MessagePrioritization> { - val messages: MutableList<MessagePrioritization> = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, "sample-type", - "key1=value1,key2=value2" - ) - messages.add(backPressureMessage) - } - return messages - } - - fun sampleMessageWithDifferentTypeSameCorrelation( - groupName: String, - messageState: String, - count: Int - ): List<MessagePrioritization> { - val messages: MutableList<MessagePrioritization> = arrayListOf() - repeat(count) { - val backPressureMessage = createMessage( - groupName, messageState, "type-$it", - "key1=value1,key2=value2" - ) - messages.add(backPressureMessage) - } - return messages - } - - fun createMessage( - groupName: String, - messageState: String, - messageType: String, - messageCorrelationId: String? - ): MessagePrioritization { - - return MessagePrioritization().apply { - id = UUID.randomUUID().toString() - group = groupName - type = messageType - state = messageState - priority = (1..10).shuffled().first() - correlationId = messageCorrelationId - message = "I am the Message" - createdDate = Date() - updatedDate = Date() - expiryDate = currentDatePlusDays(3) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt deleted file mode 100644 index 86cec3697..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.apache.kafka.streams.processor.ProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.core.cluster.optionalClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -object MessageProcessorUtils { - - /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ - suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - - return if (clusterService != null && clusterService.clusterJoined() && - !messagePrioritization.correlationId.isNullOrBlank() - ) { - // Get the correlation key in ascending order, even it it is misplaced - val correlationId = messagePrioritization.toFormatedCorrelation() - val lockName = "prioritize::${messagePrioritization.group}::$correlationId" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility to create the cluster lock for expiry scheduler*/ - suspend fun prioritizationExpiryLock(): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritize-expiry" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility to create the cluster lock for expiry scheduler*/ - suspend fun prioritizationCleanLock(): ClusterLock? { - val clusterService = BluePrintDependencyService.optionalClusterService() - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritize-clean" - val clusterLock = clusterService.clusterLock(lockName) - clusterLock.lock() - if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") - clusterLock - } else null - } - - /** Utility used to cluster unlock for message [clusterLock] */ - suspend fun prioritizationUnLock(clusterLock: ClusterLock?) { - if (clusterLock != null) { - clusterLock.unLock() - clusterLock.close() - } - } - - /** Get the Kafka Supplier for processor lookup [name] **/ - fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> { - return ProcessorSupplier<K, V> { - // Dynamically resolve the Prioritization Processor - BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt deleted file mode 100644 index 286a9b5c1..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import io.mockk.coEvery -import io.mockk.every -import io.mockk.mockk -import io.mockk.spyk -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.junit.Before -import org.junit.runner.RunWith -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample -import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils -import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest -import org.springframework.context.ApplicationContext -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.ContextConfiguration -import org.springframework.test.context.TestPropertySource -import org.springframework.test.context.junit4.SpringRunner -import kotlin.test.Test -import kotlin.test.assertNotNull - -@RunWith(SpringRunner::class) -@DataJpaTest -@DirtiesContext -@ContextConfiguration( - classes = [ - BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, - MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class - ] -) -@TestPropertySource( - properties = - [ - "spring.jpa.show-sql=false", - "spring.jpa.properties.hibernate.show_sql=false", - "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", - - "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth", - "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", - "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword", - - // To send initial test message - "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth", - "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic", - "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks", - "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword", - "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user", - "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword", - - "blueprintsprocessor.nats.cds-controller.type=token-auth", - "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222", - "blueprintsprocessor.nats.cds-controller.token=tokenAuth" - ] -) -open class MessagePrioritizationConsumerTest { - - private val log = logger(MessagePrioritizationConsumerTest::class) - - @Autowired - lateinit var applicationContext: ApplicationContext - - @Autowired - lateinit var prioritizationMessageRepository: PrioritizationMessageRepository - - @Autowired - lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService - - @Autowired - lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService - - @Autowired - lateinit var messagePrioritizationStateService: MessagePrioritizationStateService - - @Before - fun setup() { - BluePrintDependencyService.inject(applicationContext) - } - - @Test - fun testBluePrintKafkaJDBCKeyStore() { - runBlocking { - assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") - - val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) - assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") - - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { - val message = messagePrioritizationService.saveMessage(it) - val repoResult = messagePrioritizationService.getMessage(message.id) - assertNotNull(repoResult, "failed to get inserted message.") - } - } - } - - @Test - fun testMessagePrioritizationService() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val messagePrioritizationService = - SampleMessagePrioritizationService(messagePrioritizationStateService) - messagePrioritizationService.setConfiguration(configuration) - - log.info("**************** without Correlation **************") - /** Checking without correlation */ - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - messagePrioritizationService.prioritize(it) - } - log.info("**************** Same Group , with Correlation **************") - /** checking same group with correlation */ - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(10) - messagePrioritizationService.prioritize(it) - } - log.info("**************** Different Type , with Correlation **************") - /** checking different type, with correlation */ - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(10) - messagePrioritizationService.prioritize(it) - } - } - } - - @Test - fun testStartConsuming() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - - val streamingConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector) - assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") - - val spyStreamingConsumerService = spyk(streamingConsumerService) - coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit - coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService, mockk() - ) - val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) - - // Test Topology - val kafkaStreamConsumerFunction = - spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) - val messageConsumerProperties = bluePrintMessageLibPropertyService - .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") - val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) - assertNotNull(topology, "failed to get create topology") - - every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService - spyMessagePrioritizationConsumer.startConsuming(configuration) - spyMessagePrioritizationConsumer.shutDown() - } - } - - @Test - fun testSchedulerService() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val messagePrioritizationService = - SampleMessagePrioritizationService(messagePrioritizationStateService) - messagePrioritizationService.setConfiguration(configuration) - - val messagePrioritizationSchedulerService = - MessagePrioritizationSchedulerService(messagePrioritizationService) - launch { - messagePrioritizationSchedulerService.startScheduling() - } - launch { - /** To debug increase the delay time */ - delay(20) - messagePrioritizationSchedulerService.shutdownScheduling() - } - } - } - - /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - // @Test - fun testKafkaMessagePrioritizationConsumer() { - runBlocking { - - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - val kafkaMessagePrioritizationService = - SampleKafkaMessagePrioritizationService(messagePrioritizationStateService) - kafkaMessagePrioritizationService.setConfiguration(configuration) - - val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor( - messagePrioritizationStateService, - kafkaMessagePrioritizationService - ) - - // Register the processor - BluePrintDependencyService.registerSingleton( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - defaultMessagePrioritizeProcessor - ) - - val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer( - bluePrintMessageLibPropertyService, - kafkaMessagePrioritizationService - ) - messagePrioritizationConsumer.startConsuming(configuration) - - /** Send sample message with every 1 sec */ - val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService - launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - delay(100) - val headers: MutableMap<String, String> = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - val headers: MutableMap<String, String> = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(2000) - val headers: MutableMap<String, String> = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB( - key = "mykey", - message = it.asJsonString(false), - headers = headers - ) - } - } - delay(10000) - messagePrioritizationConsumer.shutDown() - } - } - - /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker - * Start : - * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V - * */ - // @Test - fun testNatsMessagePrioritizationConsumer() { - runBlocking { - val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() - assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration") - - val inputSubject = - NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject) - - val natsMessagePrioritizationService = - SampleNatsMessagePrioritizationService(messagePrioritizationStateService) - natsMessagePrioritizationService.setConfiguration(configuration) - - val messagePrioritizationConsumer = - NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService) - messagePrioritizationConsumer.startConsuming() - - /** Send sample message with every 1 sec */ - val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService - - launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { - delay(100) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - - MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - - MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(200) - bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray()) - } - } - delay(3000) - messagePrioritizationConsumer.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt deleted file mode 100644 index 22c399608..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.ComponentScan -import org.springframework.context.annotation.Configuration -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate -import org.springframework.stereotype.Service -import javax.sql.DataSource - -@Configuration -@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"]) -@EnableAutoConfiguration -open class TestDatabaseConfiguration { - - @Bean("primaryDBLibGenericService") - open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService { - return PrimaryDBLibGenericService( - NamedParameterJdbcTemplate(dataSource) - ) - } -} - -/* Sample Prioritization Listener, used during Application startup -@Component -open class SamplePrioritizationListeners(private val defaultMessagePrioritizationConsumer: MessagePrioritizationConsumer) { - - private val log = logger(SamplePrioritizationListeners::class) - - @EventListener(ApplicationReadyEvent::class) - open fun init() = runBlocking { - log.info("Starting PrioritizationListeners...") - defaultMessagePrioritizationConsumer - .startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration()) - } - - @PreDestroy - open fun destroy() = runBlocking { - log.info("Shutting down PrioritizationListeners...") - defaultMessagePrioritizationConsumer.shutDown() - } -} - */ - -@Service -open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : - SampleMessagePrioritizationService(messagePrioritizationStateService) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt deleted file mode 100644 index 73d3738e5..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils - -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class MessageCorrelationUtilsTest { - - @Test - fun testCorrelationKeysReordered() { - - val message1 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2" - ) - val message2 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key2=value2,key1=value1" - ) - - val multipleMessages: MutableList<MessagePrioritization> = arrayListOf() - multipleMessages.add(message1) - multipleMessages.add(message2) - val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages) - assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered") - } - - @Test - fun differentTypesWithSameCorrelationMessages() { - /** With Types **/ - /* Assumption is Same group with different types */ - val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) - val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2") - ) - assertTrue( - differentTypesWithSameCorrelationMessagesResponse.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResponse" - ) - - /* Assumption is Same group with different types and one missing expected types, - In this case type-3 message is missing */ - val differentTypesWithSameCorrelationMessagesResWithMissingType = - MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2", "type-3") - ) - assertTrue( - !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" - ) - } - - @Test - fun withSameCorrelationMessagesWithIgnoredTypes() { - /** With ignoring Types */ - /** Assumption is only one message received */ - val withSameCorrelationOneMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) - val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationOneMessages, null - ) - assertTrue( - !withSameCorrelationOneMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp" - ) - - /** Assumption is two message received for same group with same correlation */ - val withSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) - val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationMessages, null - ) - assertTrue( - withSameCorrelationMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp" - ) - } - - @Test - fun differentTypesWithDifferentCorrelationMessage() { - /** Assumption is two message received for same group with different expected types and different correlation */ - val message1 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2" - ) - val message2 = MessagePrioritizationSample.createMessage( - "sample-group", MessageState.NEW.name, - "type-1", "key1=value1,key2=value3" - ) - val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf() - differentTypesWithDifferentCorrelationMessage.add(message1) - differentTypesWithDifferentCorrelationMessage.add(message2) - val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithDifferentCorrelationMessage, - arrayListOf("type-0", "type-1") - ) - assertTrue( - !differentTypesWithDifferentCorrelationMessageResp.correlated, - "failed to correlate differentTypesWithDifferentCorrelationMessageResp" - ) - } - - @Test - fun testPrioritizationOrdering() { - val differentPriorityMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5) - val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority() - assertNotNull(orderedPriorityMessages, "failed to order the priority messages") - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml deleted file mode 100644 index e3a1f7a01..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - ~ Copyright © 2018-2019 AT&T Intellectual Property. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - - <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> - <property name="defaultPattern" - value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> - <property name="testing" - value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <!-- encoders are assigned the type - ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> - <encoder> - <pattern>${localPattern}</pattern> - </encoder> - </appender> - - <logger name="org.springframework.test" level="warn"/> - <logger name="org.springframework" level="warn"/> - <logger name="org.hibernate.type.descriptor.sql" level="warn"/> - <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> - - <root level="warn"> - <appender-ref ref="STDOUT"/> - </root> - -</configuration> |