diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
21 files changed, 534 insertions, 365 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt index d114da521..c2965c4e8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -34,8 +34,6 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa this.processorContext = context /** Get the State service to update in store */ this.messagePrioritizationStateService = BluePrintDependencyService - .messagePrioritizationStateService() - + .messagePrioritizationStateService() } - -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt index cce883c91..28e096352 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration @ComponentScan open class MessagePrioritizationConfiguration - object MessagePrioritizationConstants { const val SOURCE_INPUT = "source-prioritization-input" @@ -34,4 +33,4 @@ object MessagePrioritizationConstants { const val SINK_OUTPUT = "sink-prioritization-output" const val SINK_EXPIRED = "sink-prioritization-expired" -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index 967cc190e..ed124d1b2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -28,7 +28,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsume import org.onap.ccsdk.cds.controllerblueprints.core.logger open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) { + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) { private val log = logger(MessagePrioritizationConsumer::class) @@ -36,15 +37,17 @@ open class MessagePrioritizationConsumer( open fun consumerService(selector: String): BlueprintMessageConsumerService { return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) + .blueprintMessageConsumerService(selector) } - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration) - : KafkaStreamConsumerFunction { + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { return object : KafkaStreamConsumerFunction { - override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>?): Topology { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>? + ): Topology { val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties @@ -55,33 +58,49 @@ open class MessagePrioritizationConsumer( topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration), - MessagePrioritizationConstants.SOURCE_INPUT) - - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - prioritizationConfiguration), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) - - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT, - bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT, - prioritizationConfiguration), - MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - - topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED, - prioritizationConfiguration.expiredTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) - - /** To receive completed and error messages */ - topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, - prioritizationConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier<ByteArray, ByteArray>( MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + prioritizationConfiguration + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + bluePrintProcessorSupplier<String, String>( MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - MessagePrioritizationConstants.PROCESSOR_OUTPUT) + prioritizationConfiguration + ), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_OUTPUT, + bluePrintProcessorSupplier<String, String>( + MessagePrioritizationConstants.PROCESSOR_OUTPUT, + prioritizationConfiguration + ), + MessagePrioritizationConstants.PROCESSOR_AGGREGATE + ) + + topology.addSink( + MessagePrioritizationConstants.SINK_EXPIRED, + prioritizationConfiguration.expiredTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + prioritizationConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + MessagePrioritizationConstants.PROCESSOR_OUTPUT + ) // Output will be sent to the group-output topic from Processor API return topology @@ -102,4 +121,4 @@ open class MessagePrioritizationConsumer( streamingConsumerService.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 3358a5643..3ecfa27e0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -61,8 +61,9 @@ open class UpdateStateRequest : Serializable { var state: String? = null } -data class CorrelationCheckResponse(var message: String? = null, - var correlated: Boolean = false) +data class CorrelationCheckResponse( + var message: String? = null, + var correlated: Boolean = false +) data class TypeCorrelationKey(val type: String, val correlationId: String) - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index ec061ad47..39d081455 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -21,30 +21,29 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - /** * Register the MessagePrioritizationStateService and exposed dependency */ fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = - instance(MessagePrioritizationStateService::class) + instance(MessagePrioritizationStateService::class) /** * Expose messagePrioritizationStateService to AbstractComponentFunction */ fun AbstractComponentFunction.messagePrioritizationStateService() = - BluePrintDependencyService.messagePrioritizationStateService() + BluePrintDependencyService.messagePrioritizationStateService() /** * MessagePrioritization correlation extensions */ fun MessagePrioritization.toFormatedCorrelation(): String { val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") + .map { it.trim() }.sorted().joinToString(",") return ascendingKey } fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") + .map { it.trim() }.sorted().joinToString(",") return TypeCorrelationKey(this.type, ascendingKey) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt index 382cb9c8a..262dcb402 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -21,7 +21,13 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.springframework.http.MediaType -import org.springframework.web.bind.annotation.* +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.ResponseBody +import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping(value = ["/api/v1/message-prioritization"]) @@ -31,25 +37,30 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic @ResponseBody fun ping(): String = "Success" - @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc { messagePrioritizationStateService.getMessage(id) } - @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE]) + @PostMapping( + path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) @ResponseBody fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { messagePrioritizationStateService.saveMessage(messagePrioritization) } - @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE]) + @PostMapping( + path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = - monoMdc { - messagePrioritizationStateService.setMessageState(updateMessageState.id, - updateMessageState.state!!) - } -}
\ No newline at end of file + monoMdc { + messagePrioritizationStateService.setMessageState( + updateMessageState.id, + updateMessageState.state!! + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt index 15e85b0e7..ce2085f68 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt @@ -21,8 +21,15 @@ import org.hibernate.annotations.Proxy import org.springframework.data.annotation.LastModifiedDate import org.springframework.data.jpa.domain.support.AuditingEntityListener import org.springframework.data.jpa.repository.config.EnableJpaAuditing -import java.util.* -import javax.persistence.* +import java.util.Date +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.EntityListeners +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.Table +import javax.persistence.Temporal +import javax.persistence.TemporalType @EnableJpaAuditing @EntityListeners(AuditingEntityListener::class) @@ -30,6 +37,7 @@ import javax.persistence.* @Table(name = "MESSAGE_PRIORITIZATION") @Proxy(lazy = false) open class MessagePrioritization { + @Id @Column(name = "message_id", length = 50) lateinit var id: String @@ -78,4 +86,4 @@ open class MessagePrioritization { @Temporal(TemporalType.TIMESTAMP) @Column(name = "expiry_date", nullable = false) var expiryDate: Date? = null -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt deleted file mode 100644 index 5c2495fd7..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import org.springframework.data.domain.Pageable -import org.springframework.data.jpa.repository.JpaRepository -import org.springframework.data.jpa.repository.Modifying -import org.springframework.data.jpa.repository.Query -import org.springframework.stereotype.Repository -import org.springframework.transaction.annotation.Transactional -import java.util.* - -@Repository -@Transactional(readOnly = true) -interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> { - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") - fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.createdDate asc") - fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.updatedDate asc") - fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable) - : List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List<String>, expiryCheckDate: Date, - count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByStateInAndExpiredDate(states: List<String>, expiryCheckDate: Date, - count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByGroupAndStateInAndExpiredDate(group: String, states: List<String>, expiryCheckDate: Date, - count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") - fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String) - : List<MessagePrioritization>? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") - fun findByGroupAndTypesAndCorrelationId(group: String, states: List<String>, types: List<String>, - correlationId: String): List<MessagePrioritization>? - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id = :id") - fun setStateForMessageId(id: String, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id = :id") - fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id IN :ids") - fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id IN :ids") - fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + - "WHERE id = :id") - fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, " + - "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id") - fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") - fun deleteGroup(group: String) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") - fun deleteGroupAndStateIn(group: String, states: List<String>) -} - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt new file mode 100644 index 000000000..b0514838a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt @@ -0,0 +1,160 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc" + ) + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable): + List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndNotExpiredDate( + group: String, + states: List<String>, + expiryCheckDate: Date, + count: Pageable + ): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByStateInAndExpiredDate( + states: List<String>, + expiryCheckDate: Date, + count: Pageable + ): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndExpiredDate( + group: String, + states: List<String>, + expiryCheckDate: Date, + count: Pageable + ): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String): + List<MessagePrioritization>? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndTypesAndCorrelationId( + group: String, + states: List<String>, + types: List<String>, + correlationId: String + ): List<MessagePrioritization>? + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateForMessageId(id: String, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" + ) + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") + fun deleteGroupAndStateIn(group: String, states: List<String>) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt index 6138fa9d3..017658ff6 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -25,7 +25,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.data.domain.PageRequest import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional -import java.util.* +import java.util.Date interface MessagePrioritizationStateService { @@ -37,16 +37,20 @@ interface MessagePrioritizationStateService { suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int) - : List<MessagePrioritization>? + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? - suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int) - : List<MessagePrioritization>? + suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? - suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, - correlationIds: String): List<MessagePrioritization>? + suspend fun getCorrelatedMessages( + group: String, + states: List<String>, + types: List<String>?, + correlationIds: String + ): List<MessagePrioritization>? suspend fun updateMessagesState(ids: List<String>, state: String) @@ -73,8 +77,9 @@ interface MessagePrioritizationStateService { @Service open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository) - : MessagePrioritizationStateService { + private val prioritizationMessageRepository: PrioritizationMessageRepository +) : + MessagePrioritizationStateService { private val log = logger(MessagePrioritizationStateServiceImpl::class) @@ -89,7 +94,7 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getMessage(id: String): MessagePrioritization { return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BluePrintProcessorException("couldn't find message for id($id)") + ?: throw BluePrintProcessorException("couldn't find message for id($id)") } override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? { @@ -98,30 +103,42 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? { return prioritizationMessageRepository - .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), - Date(), PageRequest.of(0, count)) - } - - override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int) - : List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group, - states, Date(), PageRequest.of(0, count)) - } - - override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int) - : List<MessagePrioritization>? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group, - states, Date(), PageRequest.of(0, count)) - } - - override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int) - : List<MessagePrioritization>? { - return prioritizationMessageRepository.findByByGroupAndExpiredDate(group, - expiryDate, PageRequest.of(0, count)) - } - - override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, - correlationIds: String): List<MessagePrioritization>? { + .findByStateInAndExpiredDate( + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): + List<MessagePrioritization>? { + return prioritizationMessageRepository.findByByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getCorrelatedMessages( + group: String, + states: List<String>, + types: List<String>?, + correlationIds: String + ): List<MessagePrioritization>? { return if (!types.isNullOrEmpty()) { prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) } else { @@ -185,7 +202,9 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { - return prioritizationMessageRepository.deleteGroupAndStateIn(group, - arrayListOf(MessageState.EXPIRED.name)) + return prioritizationMessageRepository.deleteGroupAndStateIn( + group, + arrayListOf(MessageState.EXPIRED.name) + ) } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt index 45f5c773d..3e697e633 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -22,7 +22,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.controllerblueprints.core.logger - open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() { private val log = logger(MessageAggregateProcessor::class) @@ -50,16 +49,21 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String storeMessages.forEach { messagePrioritization -> try { /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id, - MessageState.ERROR.name, error) + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) /** Publish to Error topic */ - this.processorContext.forward(messagePrioritization.id, messagePrioritization, - To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + this.processorContext.forward( + messagePrioritization.id, messagePrioritization, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) } catch (sendException: Exception) { - log.error("failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", e) + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e + ) } - } } } @@ -73,4 +77,4 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt index 34faa1b3b..cf6520df5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt @@ -22,7 +22,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.controllerblueprints.core.logger - open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() { private val log = logger(MessageOutputProcessor::class) @@ -32,4 +31,4 @@ open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, S val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name) processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt index a745e034c..5435ebe30 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt @@ -24,41 +24,46 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator import org.onap.ccsdk.cds.controllerblueprints.core.logger - -class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) - : AbstractBluePrintMessagePunctuator() { +class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractBluePrintMessagePunctuator() { private val log = logger(MessagePriorityExpiryPunctuator::class) lateinit var configuration: PrioritizationConfiguration override suspend fun punctuateNB(timestamp: Long) { - log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})") + log.info( + "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) val expiryConfiguration = configuration.expiryConfiguration val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) val expiredIds = fetchMessages?.map { it.id } if (expiredIds != null && expiredIds.isNotEmpty()) { messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) fetchMessages.forEach { expired -> - processorContext.forward(expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_EXPIRED)) + processorContext.forward( + expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_EXPIRED) + ) } } } } -class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) - : AbstractBluePrintMessagePunctuator() { +class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractBluePrintMessagePunctuator() { private val log = logger(MessagePriorityCleanPunctuator::class) lateinit var configuration: PrioritizationConfiguration override suspend fun punctuateNB(timestamp: Long) { - log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})") - //TODO + log.info( + "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + // TODO } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt index 00d454727..f2a481f74 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt @@ -37,7 +37,7 @@ open class MessagePrioritizationSerde : Serde<MessagePrioritization> { return object : Deserializer<MessagePrioritization> { override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") } override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { @@ -61,4 +61,4 @@ open class MessagePrioritizationSerde : Serde<MessagePrioritization> { } } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 7dde2655d..431e02f30 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -29,8 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import java.time.Duration -import java.util.* - +import java.util.UUID open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { @@ -42,7 +41,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA override suspend fun processNB(key: ByteArray, value: ByteArray) { log.info("***** received in prioritize processor key(${String(key)})") val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") try { // Save the Message messagePrioritizationStateService.saveMessage(messagePrioritize) @@ -51,11 +50,15 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!!) + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) /** Publish to Output topic */ - this.processorContext.forward(messagePrioritize.id, messagePrioritize, - To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + this.processorContext.forward( + messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) } } @@ -68,8 +71,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA } override fun close() { - log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})") + log.info( + "closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) expiryCancellable.cancel() cleanCancellable.cancel() } @@ -79,8 +84,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA expiryPunctuator.processorContext = processorContext expiryPunctuator.configuration = prioritizationConfiguration val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli), - PunctuationType.WALL_CLOCK_TIME, expiryPunctuator) + expiryCancellable = processorContext.schedule( + Duration.ofMillis(expiryConfiguration.frequencyMilli), + PunctuationType.WALL_CLOCK_TIME, expiryPunctuator + ) log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") } @@ -89,10 +96,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA cleanPunctuator.processorContext = processorContext cleanPunctuator.configuration = prioritizationConfiguration val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), - PunctuationType.WALL_CLOCK_TIME, cleanPunctuator) - log.info("Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days") + cleanCancellable = processorContext.schedule( + Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), + PunctuationType.WALL_CLOCK_TIME, cleanPunctuator + ) + log.info( + "Clean punctuator setup complete with expiry " + + "hold(${cleanConfiguration.expiredRecordsHoldDays})days" + ) } open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { @@ -102,25 +113,31 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA val group = messagePrioritization.group val correlationId = messagePrioritization.correlationId!! val types = getGroupCorrelationTypes(messagePrioritization) - log.info("checking correlation for message($id), group($group), types($types), " + - "correlation id($correlationId)") + log.info( + "checking correlation for message($id), group($group), types($types), " + + "correlation id($correlationId)" + ) /** Get all previously received messages from database for group and optional types and correlation Id */ - val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group, - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId) + val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages( + group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId + ) /** If multiple records found, then check correlation */ if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { /** Check all correlation satisfies */ val correlationResults = MessageCorrelationUtils - .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) if (correlationResults.correlated) { /** Correlation satisfied */ val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") /** Send only correlated ids to next processor */ - this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + this.processorContext.forward( + UUID.randomUUID().toString(), correlatedIds, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) + ) } else { /** Correlation not satisfied */ log.trace("correlation not matched : ${correlationResults.message}") @@ -135,8 +152,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA } else { // No Correlation check needed, simply forward to next processor. messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - this.processorContext.forward(messagePrioritization.id, messagePrioritization.id, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + this.processorContext.forward( + messagePrioritization.id, messagePrioritization.id, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) + ) } } @@ -145,4 +164,4 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { return null } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt index cc30af2f1..fb35df75b 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -43,8 +43,8 @@ object MessageCorrelationUtils { } /** Assumption is message is of same group and checking for required types **/ - fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?) - : CorrelationCheckResponse { + fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?): + CorrelationCheckResponse { return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { @@ -56,8 +56,8 @@ object MessageCorrelationUtils { val copyTypes = types.toTypedArray().copyOf().toMutableList() val filteredMessage = collectedMessages.filter { - !it.correlationId.isNullOrBlank() - && types.contains(it.type) + !it.correlationId.isNullOrBlank() && + types.contains(it.type) } var correlatedKeys: MutableSet<String> = mutableSetOf() if (filteredMessage.isNotEmpty()) { @@ -79,4 +79,4 @@ object MessageCorrelationUtils { return correlatedMessages(collectedMessages) } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index 185022973..4a36a40f3 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -21,7 +21,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.E import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.* +import java.util.Calendar +import java.util.Date +import java.util.UUID object MessagePrioritizationSample { @@ -57,8 +59,10 @@ object MessagePrioritizationSample { fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { val messages: MutableList<MessagePrioritization> = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, - "sample-type", null) + val backPressureMessage = createMessage( + groupName, messageState, + "sample-type", null + ) messages.add(backPressureMessage) } return messages @@ -67,26 +71,37 @@ object MessagePrioritizationSample { fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> { val messages: MutableList<MessagePrioritization> = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, "sample-type", - "key1=value1,key2=value2") + val backPressureMessage = createMessage( + groupName, messageState, "sample-type", + "key1=value1,key2=value2" + ) messages.add(backPressureMessage) } return messages } - fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String, - count: Int): List<MessagePrioritization> { + fun sampleMessageWithDifferentTypeSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List<MessagePrioritization> { val messages: MutableList<MessagePrioritization> = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, "type-$it", - "key1=value1,key2=value2") + val backPressureMessage = createMessage( + groupName, messageState, "type-$it", + "key1=value1,key2=value2" + ) messages.add(backPressureMessage) } return messages } - fun createMessage(groupName: String, messageState: String, messageType: String, - messageCorrelationId: String?): MessagePrioritization { + fun createMessage( + groupName: String, + messageState: String, + messageType: String, + messageCorrelationId: String? + ): MessagePrioritization { return MessagePrioritization().apply { id = UUID.randomUUID().toString() @@ -101,4 +116,4 @@ object MessagePrioritizationSample { expiryDate = currentDatePlusDays(3) } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 02614d821..7e5862cce 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -23,8 +23,8 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS object MessageProcessorUtils { - fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) - : ProcessorSupplier<K, V> { + fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): + ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name) @@ -32,5 +32,4 @@ object MessageProcessorUtils { processorInstance } } - -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 84f13dc1d..0ed9598f0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -44,29 +44,32 @@ import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull - @RunWith(SpringRunner::class) @DataJpaTest @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, - MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]) -@TestPropertySource(properties = -[ - "spring.jpa.show-sql=true", - "spring.jpa.properties.hibernate.show_sql=true", - "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", - - "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", - "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", - - // To send initial test message - "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] +) +@TestPropertySource( + properties = + [ + "spring.jpa.show-sql=true", + "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" + ] +) open class MessagePrioritizationConsumerTest { @Autowired @@ -89,7 +92,7 @@ open class MessagePrioritizationConsumerTest { assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) + .instance(MessagePrioritizationStateService::class) assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { @@ -106,7 +109,7 @@ open class MessagePrioritizationConsumerTest { val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() val streamingConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(configuration.inputTopicSelector) + .blueprintMessageConsumerService(configuration.inputTopicSelector) assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") val spyStreamingConsumerService = spyk(streamingConsumerService) @@ -115,11 +118,10 @@ open class MessagePrioritizationConsumerTest { val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) - // Test Topology val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService - .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) assertNotNull(topology, "failed to get create topology") @@ -130,7 +132,7 @@ open class MessagePrioritizationConsumerTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testMessagePrioritizationConsumer() { runBlocking { val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) @@ -138,38 +140,44 @@ open class MessagePrioritizationConsumerTest { /** Send sample message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { delay(100) val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) } MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - val headers: MutableMap<String, String> = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) - } + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) + } MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(2000) - val headers: MutableMap<String, String> = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) - } + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) + } } delay(10000) messagePrioritizationConsumer.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 4e3eb191b..be65c1d7c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -42,6 +42,7 @@ open class TestDatabaseConfiguration { @Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,4 +55,4 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor()
\ No newline at end of file +open class DefaultMessageOutputProcessor : MessageOutputProcessor() diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt index b470db909..3876cbba5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -26,10 +26,14 @@ class MessageCorrelationUtilsTest { @Test fun testCorrelationKeysReordered() { - val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2") - val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key2=value2,key1=value1") + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1" + ) val multipleMessages: MutableList<MessagePrioritization> = arrayListOf() multipleMessages.add(message1) @@ -43,20 +47,26 @@ class MessageCorrelationUtilsTest { /** With Types **/ /* Assumption is Same group with different types */ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2")) - assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResponse") + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2") + ) + assertTrue( + differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse" + ) /* Assumption is Same group with different types and one missing expected types, In this case type-3 message is missing */ val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2", "type-3")) - assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType") + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3") + ) + assertTrue( + !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" + ) } @Test @@ -64,35 +74,48 @@ class MessageCorrelationUtilsTest { /** With ignoring Types */ /** Assumption is only one message received */ val withSameCorrelationOneMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationOneMessages, null) - assertTrue(!withSameCorrelationOneMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp") + withSameCorrelationOneMessages, null + ) + assertTrue( + !withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) /** Assumption is two message received for same group with same correlation */ val withSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationMessages, null) - assertTrue(withSameCorrelationMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp") + withSameCorrelationMessages, null + ) + assertTrue( + withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) } @Test fun differentTypesWithDifferentCorrelationMessage() { /** Assumption is two message received for same group with different expected types and different correlation */ - val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2") - val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-1", "key1=value1,key2=value3") + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3" + ) val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf() differentTypesWithDifferentCorrelationMessage.add(message1) differentTypesWithDifferentCorrelationMessage.add(message2) val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithDifferentCorrelationMessage, - arrayListOf("type-0", "type-1")) - assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated, - "failed to correlate differentTypesWithDifferentCorrelationMessageResp") + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1") + ) + assertTrue( + !differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp" + ) } -}
\ No newline at end of file +} |