summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt)6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt83
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt7
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt9
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt33
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt)14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt118
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt160
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt93
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt22
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt33
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt)4
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt69
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt10
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt39
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt7
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt100
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt83
21 files changed, 534 insertions, 365 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
index d114da521..c2965c4e8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
@@ -34,8 +34,6 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
this.processorContext = context
/** Get the State service to update in store */
this.messagePrioritizationStateService = BluePrintDependencyService
- .messagePrioritizationStateService()
-
+ .messagePrioritizationStateService()
}
-
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
index cce883c91..28e096352 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
@@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration
@ComponentScan
open class MessagePrioritizationConfiguration
-
object MessagePrioritizationConstants {
const val SOURCE_INPUT = "source-prioritization-input"
@@ -34,4 +33,4 @@ object MessagePrioritizationConstants {
const val SINK_OUTPUT = "sink-prioritization-output"
const val SINK_EXPIRED = "sink-prioritization-expired"
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
index 967cc190e..ed124d1b2 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -28,7 +28,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsume
import org.onap.ccsdk.cds.controllerblueprints.core.logger
open class MessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) {
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+) {
private val log = logger(MessagePrioritizationConsumer::class)
@@ -36,15 +37,17 @@ open class MessagePrioritizationConsumer(
open fun consumerService(selector: String): BlueprintMessageConsumerService {
return bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(selector)
+ .blueprintMessageConsumerService(selector)
}
- open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration)
- : KafkaStreamConsumerFunction {
+ open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
+ KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
- override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?): Topology {
+ override suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology {
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
@@ -55,33 +58,49 @@ open class MessagePrioritizationConsumer(
topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
- topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- prioritizationConfiguration),
- MessagePrioritizationConstants.SOURCE_INPUT)
-
- topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- prioritizationConfiguration),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-
- topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- prioritizationConfiguration),
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-
- topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED,
- prioritizationConfiguration.expiredTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-
- /** To receive completed and error messages */
- topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
- prioritizationConfiguration.outputTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ topology.addProcessor(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ bluePrintProcessorSupplier<ByteArray, ByteArray>(
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ prioritizationConfiguration
+ ),
+ MessagePrioritizationConstants.SOURCE_INPUT
+ )
+
+ topology.addProcessor(
+ MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+ bluePrintProcessorSupplier<String, String>(
MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+ prioritizationConfiguration
+ ),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
+ )
+
+ topology.addProcessor(
+ MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+ bluePrintProcessorSupplier<String, String>(
+ MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+ prioritizationConfiguration
+ ),
+ MessagePrioritizationConstants.PROCESSOR_AGGREGATE
+ )
+
+ topology.addSink(
+ MessagePrioritizationConstants.SINK_EXPIRED,
+ prioritizationConfiguration.expiredTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
+ )
+
+ /** To receive completed and error messages */
+ topology.addSink(
+ MessagePrioritizationConstants.SINK_OUTPUT,
+ prioritizationConfiguration.outputTopic,
+ Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+ MessagePrioritizationConstants.PROCESSOR_OUTPUT
+ )
// Output will be sent to the group-output topic from Processor API
return topology
@@ -102,4 +121,4 @@ open class MessagePrioritizationConsumer(
streamingConsumerService.shutDown()
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 3358a5643..3ecfa27e0 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -61,8 +61,9 @@ open class UpdateStateRequest : Serializable {
var state: String? = null
}
-data class CorrelationCheckResponse(var message: String? = null,
- var correlated: Boolean = false)
+data class CorrelationCheckResponse(
+ var message: String? = null,
+ var correlated: Boolean = false
+)
data class TypeCorrelationKey(val type: String, val correlationId: String)
-
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index ec061ad47..39d081455 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -21,30 +21,29 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-
/**
* Register the MessagePrioritizationStateService and exposed dependency
*/
fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService =
- instance(MessagePrioritizationStateService::class)
+ instance(MessagePrioritizationStateService::class)
/**
* Expose messagePrioritizationStateService to AbstractComponentFunction
*/
fun AbstractComponentFunction.messagePrioritizationStateService() =
- BluePrintDependencyService.messagePrioritizationStateService()
+ BluePrintDependencyService.messagePrioritizationStateService()
/**
* MessagePrioritization correlation extensions
*/
fun MessagePrioritization.toFormatedCorrelation(): String {
val ascendingKey = this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
+ .map { it.trim() }.sorted().joinToString(",")
return ascendingKey
}
fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
val ascendingKey = this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
+ .map { it.trim() }.sorted().joinToString(",")
return TypeCorrelationKey(this.type, ascendingKey)
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
index 382cb9c8a..262dcb402 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -21,7 +21,13 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.springframework.http.MediaType
-import org.springframework.web.bind.annotation.*
+import org.springframework.web.bind.annotation.GetMapping
+import org.springframework.web.bind.annotation.PathVariable
+import org.springframework.web.bind.annotation.PostMapping
+import org.springframework.web.bind.annotation.RequestBody
+import org.springframework.web.bind.annotation.RequestMapping
+import org.springframework.web.bind.annotation.ResponseBody
+import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping(value = ["/api/v1/message-prioritization"])
@@ -31,25 +37,30 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic
@ResponseBody
fun ping(): String = "Success"
-
@GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc {
messagePrioritizationStateService.getMessage(id)
}
- @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
- consumes = [MediaType.APPLICATION_JSON_VALUE])
+ @PostMapping(
+ path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
@ResponseBody
fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
messagePrioritizationStateService.saveMessage(messagePrioritization)
}
- @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
- consumes = [MediaType.APPLICATION_JSON_VALUE])
+ @PostMapping(
+ path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) =
- monoMdc {
- messagePrioritizationStateService.setMessageState(updateMessageState.id,
- updateMessageState.state!!)
- }
-} \ No newline at end of file
+ monoMdc {
+ messagePrioritizationStateService.setMessageState(
+ updateMessageState.id,
+ updateMessageState.state!!
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
index 15e85b0e7..ce2085f68 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt
@@ -21,8 +21,15 @@ import org.hibernate.annotations.Proxy
import org.springframework.data.annotation.LastModifiedDate
import org.springframework.data.jpa.domain.support.AuditingEntityListener
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
-import java.util.*
-import javax.persistence.*
+import java.util.Date
+import javax.persistence.Column
+import javax.persistence.Entity
+import javax.persistence.EntityListeners
+import javax.persistence.Id
+import javax.persistence.Lob
+import javax.persistence.Table
+import javax.persistence.Temporal
+import javax.persistence.TemporalType
@EnableJpaAuditing
@EntityListeners(AuditingEntityListener::class)
@@ -30,6 +37,7 @@ import javax.persistence.*
@Table(name = "MESSAGE_PRIORITIZATION")
@Proxy(lazy = false)
open class MessagePrioritization {
+
@Id
@Column(name = "message_id", length = 50)
lateinit var id: String
@@ -78,4 +86,4 @@ open class MessagePrioritization {
@Temporal(TemporalType.TIMESTAMP)
@Column(name = "expiry_date", nullable = false)
var expiryDate: Date? = null
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
deleted file mode 100644
index 5c2495fd7..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
-
-import org.springframework.data.domain.Pageable
-import org.springframework.data.jpa.repository.JpaRepository
-import org.springframework.data.jpa.repository.Modifying
-import org.springframework.data.jpa.repository.Query
-import org.springframework.stereotype.Repository
-import org.springframework.transaction.annotation.Transactional
-import java.util.*
-
-@Repository
-@Transactional(readOnly = true)
-interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
- fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.createdDate asc")
- fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "ORDER BY pm.updatedDate asc")
- fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable)
- : List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc")
- fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
- count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
- fun findByStateInAndExpiredDate(states: List<String>, expiryCheckDate: Date,
- count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
- fun findByGroupAndStateInAndExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
- count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group " +
- "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
- fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
- fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String)
- : List<MessagePrioritization>?
-
- @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
- "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
- fun findByGroupAndTypesAndCorrelationId(group: String, states: List<String>, types: List<String>,
- correlationId: String): List<MessagePrioritization>?
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id = :id")
- fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id = :id")
- fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
- "WHERE id IN :ids")
- fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
- "WHERE id IN :ids")
- fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
- "WHERE id = :id")
- fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("UPDATE MessagePrioritization SET state = :state, " +
- "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id")
- fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
- fun deleteGroup(group: String)
-
- @Modifying
- @Transactional
- @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
- fun deleteGroupAndStateIn(group: String, states: List<String>)
-}
-
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
new file mode 100644
index 000000000..b0514838a
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
@@ -0,0 +1,160 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import org.springframework.data.domain.Pageable
+import org.springframework.data.jpa.repository.JpaRepository
+import org.springframework.data.jpa.repository.Modifying
+import org.springframework.data.jpa.repository.Query
+import org.springframework.stereotype.Repository
+import org.springframework.transaction.annotation.Transactional
+import java.util.Date
+
+@Repository
+@Transactional(readOnly = true)
+interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
+
+ @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
+ fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "ORDER BY pm.updatedDate asc"
+ )
+ fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable):
+ List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateInAndNotExpiredDate(
+ group: String,
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByStateInAndExpiredDate(
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndStateInAndExpiredDate(
+ group: String,
+ states: List<String>,
+ expiryCheckDate: Date,
+ count: Pageable
+ ): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group " +
+ "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+ )
+ fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String):
+ List<MessagePrioritization>?
+
+ @Query(
+ "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+ "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+ )
+ fun findByGroupAndTypesAndCorrelationId(
+ group: String,
+ states: List<String>,
+ types: List<String>,
+ correlationId: String
+ ): List<MessagePrioritization>?
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+ "WHERE id IN :ids"
+ )
+ fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
+ "WHERE id IN :ids"
+ )
+ fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
+ "WHERE id = :id"
+ )
+ fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query(
+ "UPDATE MessagePrioritization SET state = :state, " +
+ "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
+ )
+ fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
+ fun deleteGroup(group: String)
+
+ @Modifying
+ @Transactional
+ @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
+ fun deleteGroupAndStateIn(group: String, states: List<String>)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
index 6138fa9d3..017658ff6 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
@@ -25,7 +25,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.data.domain.PageRequest
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
-import java.util.*
+import java.util.Date
interface MessagePrioritizationStateService {
@@ -37,16 +37,20 @@ interface MessagePrioritizationStateService {
suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
- : List<MessagePrioritization>?
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
- : List<MessagePrioritization>?
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
- suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
- correlationIds: String): List<MessagePrioritization>?
+ suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>?
suspend fun updateMessagesState(ids: List<String>, state: String)
@@ -73,8 +77,9 @@ interface MessagePrioritizationStateService {
@Service
open class MessagePrioritizationStateServiceImpl(
- private val prioritizationMessageRepository: PrioritizationMessageRepository)
- : MessagePrioritizationStateService {
+ private val prioritizationMessageRepository: PrioritizationMessageRepository
+) :
+ MessagePrioritizationStateService {
private val log = logger(MessagePrioritizationStateServiceImpl::class)
@@ -89,7 +94,7 @@ open class MessagePrioritizationStateServiceImpl(
override suspend fun getMessage(id: String): MessagePrioritization {
return prioritizationMessageRepository.findById(id).orElseGet(null)
- ?: throw BluePrintProcessorException("couldn't find message for id($id)")
+ ?: throw BluePrintProcessorException("couldn't find message for id($id)")
}
override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
@@ -98,30 +103,42 @@ open class MessagePrioritizationStateServiceImpl(
override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
return prioritizationMessageRepository
- .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
- Date(), PageRequest.of(0, count))
- }
-
- override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
- : List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
- states, Date(), PageRequest.of(0, count))
- }
-
- override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
- : List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
- states, Date(), PageRequest.of(0, count))
- }
-
- override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
- : List<MessagePrioritization>? {
- return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
- expiryDate, PageRequest.of(0, count))
- }
-
- override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
- correlationIds: String): List<MessagePrioritization>? {
+ .findByStateInAndExpiredDate(
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
+ Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
+ group,
+ states, Date(), PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
+ List<MessagePrioritization>? {
+ return prioritizationMessageRepository.findByByGroupAndExpiredDate(
+ group,
+ expiryDate, PageRequest.of(0, count)
+ )
+ }
+
+ override suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>? {
return if (!types.isNullOrEmpty()) {
prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
} else {
@@ -185,7 +202,9 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
- return prioritizationMessageRepository.deleteGroupAndStateIn(group,
- arrayListOf(MessageState.EXPIRED.name))
+ return prioritizationMessageRepository.deleteGroupAndStateIn(
+ group,
+ arrayListOf(MessageState.EXPIRED.name)
+ )
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
index 45f5c773d..3e697e633 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
@@ -22,7 +22,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
private val log = logger(MessageAggregateProcessor::class)
@@ -50,16 +49,21 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String
storeMessages.forEach { messagePrioritization ->
try {
/** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id,
- MessageState.ERROR.name, error)
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
/** Publish to Error topic */
- this.processorContext.forward(messagePrioritization.id, messagePrioritization,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ this.processorContext.forward(
+ messagePrioritization.id, messagePrioritization,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
} catch (sendException: Exception) {
- log.error("failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}", e)
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e
+ )
}
-
}
}
}
@@ -73,4 +77,4 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String
processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
index 34faa1b3b..cf6520df5 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
@@ -22,7 +22,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
private val log = logger(MessageOutputProcessor::class)
@@ -32,4 +31,4 @@ open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, S
val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
index a745e034c..5435ebe30 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
@@ -24,41 +24,46 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
- : AbstractBluePrintMessagePunctuator() {
+class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractBluePrintMessagePunctuator() {
private val log = logger(MessagePriorityExpiryPunctuator::class)
lateinit var configuration: PrioritizationConfiguration
override suspend fun punctuateNB(timestamp: Long) {
- log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
+ log.info(
+ "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
val expiryConfiguration = configuration.expiryConfiguration
val fetchMessages = messagePrioritizationStateService
- .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.map { it.id }
if (expiredIds != null && expiredIds.isNotEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
fetchMessages.forEach { expired ->
- processorContext.forward(expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_EXPIRED))
+ processorContext.forward(
+ expired.id, expired,
+ To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+ )
}
}
}
}
-class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
- : AbstractBluePrintMessagePunctuator() {
+class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractBluePrintMessagePunctuator() {
private val log = logger(MessagePriorityCleanPunctuator::class)
lateinit var configuration: PrioritizationConfiguration
override suspend fun punctuateNB(timestamp: Long) {
- log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
- //TODO
+ log.info(
+ "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ // TODO
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt
index 00d454727..f2a481f74 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt
@@ -37,7 +37,7 @@ open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
return object : Deserializer<MessagePrioritization> {
override fun deserialize(topic: String, data: ByteArray): MessagePrioritization {
return JacksonUtils.readValue(String(data), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ ?: throw BluePrintProcessorException("failed to convert")
}
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
@@ -61,4 +61,4 @@ open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
}
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
index 7dde2655d..431e02f30 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -29,8 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import java.time.Duration
-import java.util.*
-
+import java.util.UUID
open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
@@ -42,7 +41,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
override suspend fun processNB(key: ByteArray, value: ByteArray) {
log.info("***** received in prioritize processor key(${String(key)})")
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ ?: throw BluePrintProcessorException("failed to convert")
try {
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
@@ -51,11 +50,15 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
/** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name,
- messagePrioritize.error!!)
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
/** Publish to Output topic */
- this.processorContext.forward(messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
}
}
@@ -68,8 +71,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
}
override fun close() {
- log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})")
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
expiryCancellable.cancel()
cleanCancellable.cancel()
}
@@ -79,8 +84,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
expiryPunctuator.processorContext = processorContext
expiryPunctuator.configuration = prioritizationConfiguration
val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator)
+ expiryCancellable = processorContext.schedule(
+ Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+ )
log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
}
@@ -89,10 +96,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
cleanPunctuator.processorContext = processorContext
cleanPunctuator.configuration = prioritizationConfiguration
val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator)
- log.info("Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days")
+ cleanCancellable = processorContext.schedule(
+ Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+ )
+ log.info(
+ "Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ )
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -102,25 +113,31 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
val group = messagePrioritization.group
val correlationId = messagePrioritization.correlationId!!
val types = getGroupCorrelationTypes(messagePrioritization)
- log.info("checking correlation for message($id), group($group), types($types), " +
- "correlation id($correlationId)")
+ log.info(
+ "checking correlation for message($id), group($group), types($types), " +
+ "correlation id($correlationId)"
+ )
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId)
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
/** Check all correlation satisfies */
val correlationResults = MessageCorrelationUtils
- .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+ .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
if (correlationResults.correlated) {
/** Correlation satisfied */
val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
/** Send only correlated ids to next processor */
- this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ this.processorContext.forward(
+ UUID.randomUUID().toString(), correlatedIds,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+ )
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
@@ -135,8 +152,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+ this.processorContext.forward(
+ messagePrioritization.id, messagePrioritization.id,
+ To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+ )
}
}
@@ -145,4 +164,4 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return null
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
index cc30af2f1..fb35df75b 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
@@ -43,8 +43,8 @@ object MessageCorrelationUtils {
}
/** Assumption is message is of same group and checking for required types **/
- fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?)
- : CorrelationCheckResponse {
+ fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?):
+ CorrelationCheckResponse {
return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
@@ -56,8 +56,8 @@ object MessageCorrelationUtils {
val copyTypes = types.toTypedArray().copyOf().toMutableList()
val filteredMessage = collectedMessages.filter {
- !it.correlationId.isNullOrBlank()
- && types.contains(it.type)
+ !it.correlationId.isNullOrBlank() &&
+ types.contains(it.type)
}
var correlatedKeys: MutableSet<String> = mutableSetOf()
if (filteredMessage.isNotEmpty()) {
@@ -79,4 +79,4 @@ object MessageCorrelationUtils {
return correlatedMessages(collectedMessages)
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
index 185022973..4a36a40f3 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -21,7 +21,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.E
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.*
+import java.util.Calendar
+import java.util.Date
+import java.util.UUID
object MessagePrioritizationSample {
@@ -57,8 +59,10 @@ object MessagePrioritizationSample {
fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
val messages: MutableList<MessagePrioritization> = arrayListOf()
repeat(count) {
- val backPressureMessage = createMessage(groupName, messageState,
- "sample-type", null)
+ val backPressureMessage = createMessage(
+ groupName, messageState,
+ "sample-type", null
+ )
messages.add(backPressureMessage)
}
return messages
@@ -67,26 +71,37 @@ object MessagePrioritizationSample {
fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
val messages: MutableList<MessagePrioritization> = arrayListOf()
repeat(count) {
- val backPressureMessage = createMessage(groupName, messageState, "sample-type",
- "key1=value1,key2=value2")
+ val backPressureMessage = createMessage(
+ groupName, messageState, "sample-type",
+ "key1=value1,key2=value2"
+ )
messages.add(backPressureMessage)
}
return messages
}
- fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String,
- count: Int): List<MessagePrioritization> {
+ fun sampleMessageWithDifferentTypeSameCorrelation(
+ groupName: String,
+ messageState: String,
+ count: Int
+ ): List<MessagePrioritization> {
val messages: MutableList<MessagePrioritization> = arrayListOf()
repeat(count) {
- val backPressureMessage = createMessage(groupName, messageState, "type-$it",
- "key1=value1,key2=value2")
+ val backPressureMessage = createMessage(
+ groupName, messageState, "type-$it",
+ "key1=value1,key2=value2"
+ )
messages.add(backPressureMessage)
}
return messages
}
- fun createMessage(groupName: String, messageState: String, messageType: String,
- messageCorrelationId: String?): MessagePrioritization {
+ fun createMessage(
+ groupName: String,
+ messageState: String,
+ messageType: String,
+ messageCorrelationId: String?
+ ): MessagePrioritization {
return MessagePrioritization().apply {
id = UUID.randomUUID().toString()
@@ -101,4 +116,4 @@ object MessagePrioritizationSample {
expiryDate = currentDatePlusDays(3)
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 02614d821..7e5862cce 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -23,8 +23,8 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS
object MessageProcessorUtils {
- fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
- : ProcessorSupplier<K, V> {
+ fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
+ ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
@@ -32,5 +32,4 @@ object MessageProcessorUtils {
processorInstance
}
}
-
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 84f13dc1d..0ed9598f0 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -44,29 +44,32 @@ import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
- MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
-@TestPropertySource(properties =
-[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
- "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
-
- "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
- "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
- "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
-
- // To send initial test message
- "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
-])
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
+ MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
+)
+@TestPropertySource(
+ properties =
+ [
+ "spring.jpa.show-sql=true",
+ "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+ "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+ // To send initial test message
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ ]
+)
open class MessagePrioritizationConsumerTest {
@Autowired
@@ -89,7 +92,7 @@ open class MessagePrioritizationConsumerTest {
assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
- .instance(MessagePrioritizationStateService::class)
+ .instance(MessagePrioritizationStateService::class)
assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
@@ -106,7 +109,7 @@ open class MessagePrioritizationConsumerTest {
val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
val streamingConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService(configuration.inputTopicSelector)
+ .blueprintMessageConsumerService(configuration.inputTopicSelector)
assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
val spyStreamingConsumerService = spyk(streamingConsumerService)
@@ -115,11 +118,10 @@ open class MessagePrioritizationConsumerTest {
val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
-
// Test Topology
val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
val messageConsumerProperties = bluePrintMessageLibPropertyService
- .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+ .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
assertNotNull(topology, "failed to get create topology")
@@ -130,7 +132,7 @@ open class MessagePrioritizationConsumerTest {
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testMessagePrioritizationConsumer() {
runBlocking {
val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
@@ -138,38 +140,44 @@ open class MessagePrioritizationConsumerTest {
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
launch {
- MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
delay(100)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
}
MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
- .forEach {
- delay(100)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
- }
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
- .forEach {
- delay(2000)
- val headers: MutableMap<String, String> = hashMapOf()
- headers["id"] = it.id
- blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
- headers = headers)
- }
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(2000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.id
+ blueprintMessageProducerService.sendMessageNB(
+ message = it.asJsonString(false),
+ headers = headers
+ )
+ }
}
delay(10000)
messagePrioritizationConsumer.shutDown()
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 4e3eb191b..be65c1d7c 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -42,6 +42,7 @@ open class TestDatabaseConfiguration {
@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+
override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
@@ -54,4 +55,4 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class DefaultMessageOutputProcessor : MessageOutputProcessor() \ No newline at end of file
+open class DefaultMessageOutputProcessor : MessageOutputProcessor()
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
index b470db909..3876cbba5 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
@@ -26,10 +26,14 @@ class MessageCorrelationUtilsTest {
@Test
fun testCorrelationKeysReordered() {
- val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
- "type-0", "key1=value1,key2=value2")
- val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
- "type-0", "key2=value2,key1=value1")
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key2=value2,key1=value1"
+ )
val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
multipleMessages.add(message1)
@@ -43,20 +47,26 @@ class MessageCorrelationUtilsTest {
/** With Types **/
/* Assumption is Same group with different types */
val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
- .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+ .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithSameCorrelationMessages,
- arrayListOf("type-0", "type-1", "type-2"))
- assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated,
- "failed to correlate differentTypesWithSameCorrelationMessagesResponse")
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2")
+ )
+ assertTrue(
+ differentTypesWithSameCorrelationMessagesResponse.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResponse"
+ )
/* Assumption is Same group with different types and one missing expected types,
In this case type-3 message is missing */
val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithSameCorrelationMessages,
- arrayListOf("type-0", "type-1", "type-2", "type-3"))
- assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
- "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType")
+ differentTypesWithSameCorrelationMessages,
+ arrayListOf("type-0", "type-1", "type-2", "type-3")
+ )
+ assertTrue(
+ !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+ "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
+ )
}
@Test
@@ -64,35 +74,48 @@ class MessageCorrelationUtilsTest {
/** With ignoring Types */
/** Assumption is only one message received */
val withSameCorrelationOneMessages = MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- withSameCorrelationOneMessages, null)
- assertTrue(!withSameCorrelationOneMessagesResp.correlated,
- "failed to correlate withSameCorrelationMessagesResp")
+ withSameCorrelationOneMessages, null
+ )
+ assertTrue(
+ !withSameCorrelationOneMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
/** Assumption is two message received for same group with same correlation */
val withSameCorrelationMessages = MessagePrioritizationSample
- .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+ .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- withSameCorrelationMessages, null)
- assertTrue(withSameCorrelationMessagesResp.correlated,
- "failed to correlate withSameCorrelationMessagesResp")
+ withSameCorrelationMessages, null
+ )
+ assertTrue(
+ withSameCorrelationMessagesResp.correlated,
+ "failed to correlate withSameCorrelationMessagesResp"
+ )
}
@Test
fun differentTypesWithDifferentCorrelationMessage() {
/** Assumption is two message received for same group with different expected types and different correlation */
- val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
- "type-0", "key1=value1,key2=value2")
- val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
- "type-1", "key1=value1,key2=value3")
+ val message1 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-0", "key1=value1,key2=value2"
+ )
+ val message2 = MessagePrioritizationSample.createMessage(
+ "sample-group", MessageState.NEW.name,
+ "type-1", "key1=value1,key2=value3"
+ )
val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
differentTypesWithDifferentCorrelationMessage.add(message1)
differentTypesWithDifferentCorrelationMessage.add(message2)
val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
- differentTypesWithDifferentCorrelationMessage,
- arrayListOf("type-0", "type-1"))
- assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated,
- "failed to correlate differentTypesWithDifferentCorrelationMessageResp")
+ differentTypesWithDifferentCorrelationMessage,
+ arrayListOf("type-0", "type-1")
+ )
+ assertTrue(
+ !differentTypesWithDifferentCorrelationMessageResp.correlated,
+ "failed to correlate differentTypesWithDifferentCorrelationMessageResp"
+ )
}
-} \ No newline at end of file
+}