From 341db21b2ac0a14a1ed2b8bf7930914dda054bfe Mon Sep 17 00:00:00 2001 From: "Singal, Kapil (ks220y)" Date: Fri, 22 Nov 2019 18:06:08 -0500 Subject: Formatting Code base with ktlint No Business logic change, just the code format. Competible with IntelliJ: https://github.com/pinterest/ktlint#option-3 To format run: mvn process-sources -P format Issue-ID: CCSDK-1947 Signed-off-by: Singal, Kapil (ks220y) Change-Id: Ic9e9209fb7023d77f434693ad5a01229f8d09331 --- .../AbstractMessagePrioritizeProcessor.kt | 39 +++++ .../prioritization/AbstractTopologyComponents.kt | 41 ------ .../MessagePrioritizationConfiguration.kt | 3 +- .../MessagePrioritizationConsumer.kt | 83 ++++++----- .../prioritization/MessagePrioritizationData.kt | 7 +- .../prioritization/MessagePrioritizeExtensions.kt | 9 +- .../prioritization/api/MessagePrioritizationApi.kt | 33 +++-- .../prioritization/db/MessagePrioritization.kt | 89 ++++++++++++ .../db/MessagePrioritizationRepositories.kt | 118 --------------- .../db/PrioritizationMessageEntity.kt | 81 ----------- .../db/PrioritizationMessageRepository.kt | 160 +++++++++++++++++++++ .../service/MessagePrioritizationStateService.kt | 93 +++++++----- .../topology/MessageAggregateProcessor.kt | 22 +-- .../topology/MessageOutputProcessor.kt | 3 +- .../topology/MessagePrioritizationPunctuators.kt | 33 +++-- .../topology/MessagePrioritizationSerde.kt | 64 +++++++++ .../topology/MessagePrioritizationSerdes.kt | 64 --------- .../topology/MessagePrioritizeProcessor.kt | 69 +++++---- .../utils/MessageCorrelationUtils.kt | 10 +- .../utils/MessagePrioritizationSample.kt | 39 +++-- .../prioritization/utils/MessageProcessorUtils.kt | 7 +- .../MessagePrioritizationConsumerTest.kt | 100 +++++++------ .../message/prioritization/TestConfiguration.kt | 3 +- .../utils/MessageCorrelationUtilsTest.kt | 83 +++++++---- 24 files changed, 711 insertions(+), 542 deletions(-) create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..c2965c4e8 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt @@ -0,0 +1,39 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { + + private val log = logger(AbstractMessagePrioritizeProcessor::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + lateinit var messagePrioritizationStateService: MessagePrioritizationStateService + + override fun init(context: ProcessorContext) { + this.processorContext = context + /** Get the State service to update in store */ + this.messagePrioritizationStateService = BluePrintDependencyService + .messagePrioritizationStateService() + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt deleted file mode 100644 index d114da521..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ -abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { - - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - lateinit var messagePrioritizationStateService: MessagePrioritizationStateService - - override fun init(context: ProcessorContext) { - this.processorContext = context - /** Get the State service to update in store */ - this.messagePrioritizationStateService = BluePrintDependencyService - .messagePrioritizationStateService() - - } - -} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt index cce883c91..28e096352 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -23,7 +23,6 @@ import org.springframework.context.annotation.Configuration @ComponentScan open class MessagePrioritizationConfiguration - object MessagePrioritizationConstants { const val SOURCE_INPUT = "source-prioritization-input" @@ -34,4 +33,4 @@ object MessagePrioritizationConstants { const val SINK_OUTPUT = "sink-prioritization-output" const val SINK_EXPIRED = "sink-prioritization-expired" -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt index 967cc190e..ed124d1b2 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt @@ -28,7 +28,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsume import org.onap.ccsdk.cds.controllerblueprints.core.logger open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) { + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) { private val log = logger(MessagePrioritizationConsumer::class) @@ -36,15 +37,17 @@ open class MessagePrioritizationConsumer( open fun consumerService(selector: String): BlueprintMessageConsumerService { return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) + .blueprintMessageConsumerService(selector) } - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration) - : KafkaStreamConsumerFunction { + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { return object : KafkaStreamConsumerFunction { - override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map?): Topology { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties @@ -55,33 +58,49 @@ open class MessagePrioritizationConsumer( topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration), - MessagePrioritizationConstants.SOURCE_INPUT) - - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - prioritizationConfiguration), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) - - topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT, - bluePrintProcessorSupplier(MessagePrioritizationConstants.PROCESSOR_OUTPUT, - prioritizationConfiguration), - MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - - topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED, - prioritizationConfiguration.expiredTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) - - /** To receive completed and error messages */ - topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT, - prioritizationConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + prioritizationConfiguration + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + bluePrintProcessorSupplier( MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - MessagePrioritizationConstants.PROCESSOR_OUTPUT) + prioritizationConfiguration + ), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_OUTPUT, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_OUTPUT, + prioritizationConfiguration + ), + MessagePrioritizationConstants.PROCESSOR_AGGREGATE + ) + + topology.addSink( + MessagePrioritizationConstants.SINK_EXPIRED, + prioritizationConfiguration.expiredTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + prioritizationConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + MessagePrioritizationConstants.PROCESSOR_AGGREGATE, + MessagePrioritizationConstants.PROCESSOR_OUTPUT + ) // Output will be sent to the group-output topic from Processor API return topology @@ -102,4 +121,4 @@ open class MessagePrioritizationConsumer( streamingConsumerService.shutDown() } } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt index 3358a5643..3ecfa27e0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt @@ -61,8 +61,9 @@ open class UpdateStateRequest : Serializable { var state: String? = null } -data class CorrelationCheckResponse(var message: String? = null, - var correlated: Boolean = false) +data class CorrelationCheckResponse( + var message: String? = null, + var correlated: Boolean = false +) data class TypeCorrelationKey(val type: String, val correlationId: String) - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index ec061ad47..39d081455 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -21,30 +21,29 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - /** * Register the MessagePrioritizationStateService and exposed dependency */ fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService = - instance(MessagePrioritizationStateService::class) + instance(MessagePrioritizationStateService::class) /** * Expose messagePrioritizationStateService to AbstractComponentFunction */ fun AbstractComponentFunction.messagePrioritizationStateService() = - BluePrintDependencyService.messagePrioritizationStateService() + BluePrintDependencyService.messagePrioritizationStateService() /** * MessagePrioritization correlation extensions */ fun MessagePrioritization.toFormatedCorrelation(): String { val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") + .map { it.trim() }.sorted().joinToString(",") return ascendingKey } fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") + .map { it.trim() }.sorted().joinToString(",") return TypeCorrelationKey(this.type, ascendingKey) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt index 382cb9c8a..262dcb402 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -21,7 +21,13 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.springframework.http.MediaType -import org.springframework.web.bind.annotation.* +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.ResponseBody +import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping(value = ["/api/v1/message-prioritization"]) @@ -31,25 +37,30 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic @ResponseBody fun ping(): String = "Success" - @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc { messagePrioritizationStateService.getMessage(id) } - @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE]) + @PostMapping( + path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) @ResponseBody fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { messagePrioritizationStateService.saveMessage(messagePrioritization) } - @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], - consumes = [MediaType.APPLICATION_JSON_VALUE]) + @PostMapping( + path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) = - monoMdc { - messagePrioritizationStateService.setMessageState(updateMessageState.id, - updateMessageState.state!!) - } -} \ No newline at end of file + monoMdc { + messagePrioritizationStateService.setMessageState( + updateMessageState.id, + updateMessageState.state!! + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt new file mode 100644 index 000000000..ce2085f68 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritization.kt @@ -0,0 +1,89 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import com.fasterxml.jackson.annotation.JsonFormat +import org.hibernate.annotations.Proxy +import org.springframework.data.annotation.LastModifiedDate +import org.springframework.data.jpa.domain.support.AuditingEntityListener +import org.springframework.data.jpa.repository.config.EnableJpaAuditing +import java.util.Date +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.EntityListeners +import javax.persistence.Id +import javax.persistence.Lob +import javax.persistence.Table +import javax.persistence.Temporal +import javax.persistence.TemporalType + +@EnableJpaAuditing +@EntityListeners(AuditingEntityListener::class) +@Entity +@Table(name = "MESSAGE_PRIORITIZATION") +@Proxy(lazy = false) +open class MessagePrioritization { + + @Id + @Column(name = "message_id", length = 50) + lateinit var id: String + + @Column(name = "message_group", length = 50, nullable = false) + lateinit var group: String + + @Column(name = "message_type", length = 50, nullable = false) + lateinit var type: String + + /** States Defined by MessageState */ + @Column(name = "message_state", length = 20, nullable = false) + lateinit var state: String + + @Column(name = "priority", nullable = false) + var priority: Int = 5 + + @Lob + @Column(name = "message", nullable = false) + var message: String? = null + + @Lob + @Column(name = "error", nullable = true) + var error: String? = null + + @Lob + @Column(name = "aggregated_message_ids", nullable = true) + var aggregatedMessageIds: String? = null + + @Lob + @Column(name = "correlation_id", nullable = true) + var correlationId: String? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "created_date", nullable = false) + var createdDate = Date() + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @LastModifiedDate + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updated_date", nullable = false) + var updatedDate: Date? = null + + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "expiry_date", nullable = false) + var expiryDate: Date? = null +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt deleted file mode 100644 index 5c2495fd7..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import org.springframework.data.domain.Pageable -import org.springframework.data.jpa.repository.JpaRepository -import org.springframework.data.jpa.repository.Modifying -import org.springframework.data.jpa.repository.Query -import org.springframework.stereotype.Repository -import org.springframework.transaction.annotation.Transactional -import java.util.* - -@Repository -@Transactional(readOnly = true) -interface PrioritizationMessageRepository : JpaRepository { - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") - fun findByGroup(group: String, count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.createdDate asc") - fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "ORDER BY pm.updatedDate asc") - fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable) - : List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List, expiryCheckDate: Date, - count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByStateInAndExpiredDate(states: List, expiryCheckDate: Date, - count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByGroupAndStateInAndExpiredDate(group: String, states: List, expiryCheckDate: Date, - count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group " + - "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc") - fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") - fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String) - : List? - - @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + - "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc") - fun findByGroupAndTypesAndCorrelationId(group: String, states: List, types: List, - correlationId: String): List? - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id = :id") - fun setStateForMessageId(id: String, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id = :id") - fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + - "WHERE id IN :ids") - fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + - "WHERE id IN :ids") - fun setPriorityForMessageIds(ids: List, priority: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + - "WHERE id = :id") - fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("UPDATE MessagePrioritization SET state = :state, " + - "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id") - fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") - fun deleteGroup(group: String) - - @Modifying - @Transactional - @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") - fun deleteGroupAndStateIn(group: String, states: List) -} - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt deleted file mode 100644 index 15e85b0e7..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db - -import com.fasterxml.jackson.annotation.JsonFormat -import org.hibernate.annotations.Proxy -import org.springframework.data.annotation.LastModifiedDate -import org.springframework.data.jpa.domain.support.AuditingEntityListener -import org.springframework.data.jpa.repository.config.EnableJpaAuditing -import java.util.* -import javax.persistence.* - -@EnableJpaAuditing -@EntityListeners(AuditingEntityListener::class) -@Entity -@Table(name = "MESSAGE_PRIORITIZATION") -@Proxy(lazy = false) -open class MessagePrioritization { - @Id - @Column(name = "message_id", length = 50) - lateinit var id: String - - @Column(name = "message_group", length = 50, nullable = false) - lateinit var group: String - - @Column(name = "message_type", length = 50, nullable = false) - lateinit var type: String - - /** States Defined by MessageState */ - @Column(name = "message_state", length = 20, nullable = false) - lateinit var state: String - - @Column(name = "priority", nullable = false) - var priority: Int = 5 - - @Lob - @Column(name = "message", nullable = false) - var message: String? = null - - @Lob - @Column(name = "error", nullable = true) - var error: String? = null - - @Lob - @Column(name = "aggregated_message_ids", nullable = true) - var aggregatedMessageIds: String? = null - - @Lob - @Column(name = "correlation_id", nullable = true) - var correlationId: String? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "created_date", nullable = false) - var createdDate = Date() - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @LastModifiedDate - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "updated_date", nullable = false) - var updatedDate: Date? = null - - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "expiry_date", nullable = false) - var expiryDate: Date? = null -} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt new file mode 100644 index 000000000..b0514838a --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt @@ -0,0 +1,160 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db + +import org.springframework.data.domain.Pageable +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Repository +@Transactional(readOnly = true) +interface PrioritizationMessageRepository : JpaRepository { + + @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc") + fun findByGroup(group: String, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateIn(group: String, states: List, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "ORDER BY pm.updatedDate asc" + ) + fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List, count: Pageable): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndNotExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByStateInAndExpiredDate( + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByGroupAndStateInAndExpiredDate( + group: String, + states: List, + expiryCheckDate: Date, + count: Pageable + ): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group " + + "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc" + ) + fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndCorrelationId(group: String, states: List, correlationId: String): + List? + + @Query( + "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " + + "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc" + ) + fun findByGroupAndTypesAndCorrelationId( + group: String, + states: List, + types: List, + correlationId: String + ): List? + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateForMessageId(id: String, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setStateForMessageIds(ids: List, state: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " + + "WHERE id IN :ids" + ) + fun setPriorityForMessageIds(ids: List, priority: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " + + "WHERE id = :id" + ) + fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query( + "UPDATE MessagePrioritization SET state = :state, " + + "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id" + ) + fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group") + fun deleteGroup(group: String) + + @Modifying + @Transactional + @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states") + fun deleteGroupAndStateIn(group: String, states: List) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt index 6138fa9d3..017658ff6 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt @@ -25,7 +25,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.data.domain.PageRequest import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional -import java.util.* +import java.util.Date interface MessagePrioritizationStateService { @@ -37,16 +37,20 @@ interface MessagePrioritizationStateService { suspend fun getExpiryEligibleMessages(count: Int): List? - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int) - : List? + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? - suspend fun getMessageForStatesExpired(group: String, states: List, count: Int) - : List? + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? - suspend fun getCorrelatedMessages(group: String, states: List, types: List?, - correlationIds: String): List? + suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? suspend fun updateMessagesState(ids: List, state: String) @@ -73,8 +77,9 @@ interface MessagePrioritizationStateService { @Service open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository) - : MessagePrioritizationStateService { + private val prioritizationMessageRepository: PrioritizationMessageRepository +) : + MessagePrioritizationStateService { private val log = logger(MessagePrioritizationStateServiceImpl::class) @@ -89,7 +94,7 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getMessage(id: String): MessagePrioritization { return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BluePrintProcessorException("couldn't find message for id($id)") + ?: throw BluePrintProcessorException("couldn't find message for id($id)") } override suspend fun getMessages(ids: List): List? { @@ -98,30 +103,42 @@ open class MessagePrioritizationStateServiceImpl( override suspend fun getExpiryEligibleMessages(count: Int): List? { return prioritizationMessageRepository - .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), - Date(), PageRequest.of(0, count)) - } - - override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int) - : List? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group, - states, Date(), PageRequest.of(0, count)) - } - - override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int) - : List? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group, - states, Date(), PageRequest.of(0, count)) - } - - override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int) - : List? { - return prioritizationMessageRepository.findByByGroupAndExpiredDate(group, - expiryDate, PageRequest.of(0, count)) - } - - override suspend fun getCorrelatedMessages(group: String, states: List, types: List?, - correlationIds: String): List? { + .findByStateInAndExpiredDate( + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): + List? { + return prioritizationMessageRepository.findByByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? { return if (!types.isNullOrEmpty()) { prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) } else { @@ -185,7 +202,9 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { - return prioritizationMessageRepository.deleteGroupAndStateIn(group, - arrayListOf(MessageState.EXPIRED.name)) + return prioritizationMessageRepository.deleteGroupAndStateIn( + group, + arrayListOf(MessageState.EXPIRED.name) + ) } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt index 45f5c773d..3e697e633 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt @@ -22,7 +22,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.M import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.controllerblueprints.core.logger - open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor() { private val log = logger(MessageAggregateProcessor::class) @@ -50,16 +49,21 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor try { /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id, - MessageState.ERROR.name, error) + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) /** Publish to Error topic */ - this.processorContext.forward(messagePrioritization.id, messagePrioritization, - To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + this.processorContext.forward( + messagePrioritization.id, messagePrioritization, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) } catch (sendException: Exception) { - log.error("failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", e) + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e + ) } - } } } @@ -73,4 +77,4 @@ open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor() { private val log = logger(MessageOutputProcessor::class) @@ -32,4 +31,4 @@ open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor - processorContext.forward(expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_EXPIRED)) + processorContext.forward( + expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_EXPIRED) + ) } } } } -class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) - : AbstractBluePrintMessagePunctuator() { +class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractBluePrintMessagePunctuator() { private val log = logger(MessagePriorityCleanPunctuator::class) lateinit var configuration: PrioritizationConfiguration override suspend fun punctuateNB(timestamp: Long) { - log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})") - //TODO + log.info( + "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + // TODO } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt new file mode 100644 index 000000000..f2a481f74 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt deleted file mode 100644 index 00d454727..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.common.serialization.Deserializer -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.common.serialization.Serializer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.nio.charset.Charset - -open class MessagePrioritizationSerde : Serde { - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - - override fun deserializer(): Deserializer { - return object : Deserializer { - override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { - return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - } - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - } - } - - override fun serializer(): Serializer { - return object : Serializer { - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { - return data.asJsonString().toByteArray(Charset.defaultCharset()) - } - - override fun close() { - } - } - } -} \ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt index 7dde2655d..431e02f30 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt @@ -29,8 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import java.time.Duration -import java.util.* - +import java.util.UUID open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor() { @@ -42,7 +41,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor 1) { /** Check all correlation satisfies */ val correlationResults = MessageCorrelationUtils - .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) if (correlationResults.correlated) { /** Correlation satisfied */ val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") /** Send only correlated ids to next processor */ - this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)) + this.processorContext.forward( + UUID.randomUUID().toString(), correlatedIds, + To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) + ) } else { /** Correlation not satisfied */ log.trace("correlation not matched : ${correlationResults.message}") @@ -135,8 +152,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor? { return null } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt index cc30af2f1..fb35df75b 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt @@ -43,8 +43,8 @@ object MessageCorrelationUtils { } /** Assumption is message is of same group and checking for required types **/ - fun correlatedMessagesWithTypes(collectedMessages: List, types: List?) - : CorrelationCheckResponse { + fun correlatedMessagesWithTypes(collectedMessages: List, types: List?): + CorrelationCheckResponse { return if (!types.isNullOrEmpty() && collectedMessages.size > 1) { @@ -56,8 +56,8 @@ object MessageCorrelationUtils { val copyTypes = types.toTypedArray().copyOf().toMutableList() val filteredMessage = collectedMessages.filter { - !it.correlationId.isNullOrBlank() - && types.contains(it.type) + !it.correlationId.isNullOrBlank() && + types.contains(it.type) } var correlatedKeys: MutableSet = mutableSetOf() if (filteredMessage.isNotEmpty()) { @@ -79,4 +79,4 @@ object MessageCorrelationUtils { return correlatedMessages(collectedMessages) } } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt index 185022973..4a36a40f3 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt @@ -21,7 +21,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.E import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import java.util.* +import java.util.Calendar +import java.util.Date +import java.util.UUID object MessagePrioritizationSample { @@ -57,8 +59,10 @@ object MessagePrioritizationSample { fun sampleMessages(groupName: String, messageState: String, count: Int): List { val messages: MutableList = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, - "sample-type", null) + val backPressureMessage = createMessage( + groupName, messageState, + "sample-type", null + ) messages.add(backPressureMessage) } return messages @@ -67,26 +71,37 @@ object MessagePrioritizationSample { fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List { val messages: MutableList = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, "sample-type", - "key1=value1,key2=value2") + val backPressureMessage = createMessage( + groupName, messageState, "sample-type", + "key1=value1,key2=value2" + ) messages.add(backPressureMessage) } return messages } - fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String, - count: Int): List { + fun sampleMessageWithDifferentTypeSameCorrelation( + groupName: String, + messageState: String, + count: Int + ): List { val messages: MutableList = arrayListOf() repeat(count) { - val backPressureMessage = createMessage(groupName, messageState, "type-$it", - "key1=value1,key2=value2") + val backPressureMessage = createMessage( + groupName, messageState, "type-$it", + "key1=value1,key2=value2" + ) messages.add(backPressureMessage) } return messages } - fun createMessage(groupName: String, messageState: String, messageType: String, - messageCorrelationId: String?): MessagePrioritization { + fun createMessage( + groupName: String, + messageState: String, + messageType: String, + messageCorrelationId: String? + ): MessagePrioritization { return MessagePrioritization().apply { id = UUID.randomUUID().toString() @@ -101,4 +116,4 @@ object MessagePrioritizationSample { expiryDate = currentDatePlusDays(3) } } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 02614d821..7e5862cce 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -23,8 +23,8 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS object MessageProcessorUtils { - fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration) - : ProcessorSupplier { + fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): + ProcessorSupplier { return ProcessorSupplier { // Dynamically resolve the Prioritization Processor val processorInstance = BluePrintDependencyService.instance>(name) @@ -32,5 +32,4 @@ object MessageProcessorUtils { processorInstance } } - -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 84f13dc1d..0ed9598f0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -44,29 +44,32 @@ import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull - @RunWith(SpringRunner::class) @DataJpaTest @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, - MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]) -@TestPropertySource(properties = -[ - "spring.jpa.show-sql=true", - "spring.jpa.properties.hibernate.show_sql=true", - "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", - - "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", - "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", - - // To send initial test message - "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class, + MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class] +) +@TestPropertySource( + properties = + [ + "spring.jpa.show-sql=true", + "spring.jpa.properties.hibernate.show_sql=true", + "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl", + + "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application", + "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic", + + // To send initial test message + "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic" + ] +) open class MessagePrioritizationConsumerTest { @Autowired @@ -89,7 +92,7 @@ open class MessagePrioritizationConsumerTest { assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository") val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService - .instance(MessagePrioritizationStateService::class) + .instance(MessagePrioritizationStateService::class) assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService") MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach { @@ -106,7 +109,7 @@ open class MessagePrioritizationConsumerTest { val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() val streamingConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(configuration.inputTopicSelector) + .blueprintMessageConsumerService(configuration.inputTopicSelector) assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService") val spyStreamingConsumerService = spyk(streamingConsumerService) @@ -115,11 +118,10 @@ open class MessagePrioritizationConsumerTest { val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) - // Test Topology val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration) val messageConsumerProperties = bluePrintMessageLibPropertyService - .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") + .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input") val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null) assertNotNull(topology, "failed to get create topology") @@ -130,7 +132,7 @@ open class MessagePrioritizationConsumerTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testMessagePrioritizationConsumer() { runBlocking { val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) @@ -138,38 +140,44 @@ open class MessagePrioritizationConsumerTest { /** Send sample message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService launch { - MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { delay(100) val headers: MutableMap = hashMapOf() headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) } MessagePrioritizationSample - .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) - .forEach { - delay(100) - val headers: MutableMap = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) - } + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(100) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) + } MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) - .forEach { - delay(2000) - val headers: MutableMap = hashMapOf() - headers["id"] = it.id - blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false), - headers = headers) - } + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(2000) + val headers: MutableMap = hashMapOf() + headers["id"] = it.id + blueprintMessageProducerService.sendMessageNB( + message = it.asJsonString(false), + headers = headers + ) + } } delay(10000) messagePrioritizationConsumer.shutDown() } } -} \ No newline at end of file +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 4e3eb191b..be65c1d7c 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -42,6 +42,7 @@ open class TestDatabaseConfiguration { @Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { return when (messagePrioritization.group) { "group-typed" -> arrayListOf("type-0", "type-1", "type-2") @@ -54,4 +55,4 @@ open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() { open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor() @Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class DefaultMessageOutputProcessor : MessageOutputProcessor() \ No newline at end of file +open class DefaultMessageOutputProcessor : MessageOutputProcessor() diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt index b470db909..3876cbba5 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt @@ -26,10 +26,14 @@ class MessageCorrelationUtilsTest { @Test fun testCorrelationKeysReordered() { - val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2") - val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key2=value2,key1=value1") + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key2=value2,key1=value1" + ) val multipleMessages: MutableList = arrayListOf() multipleMessages.add(message1) @@ -43,20 +47,26 @@ class MessageCorrelationUtilsTest { /** With Types **/ /* Assumption is Same group with different types */ val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) + .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3) val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2")) - assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResponse") + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2") + ) + assertTrue( + differentTypesWithSameCorrelationMessagesResponse.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResponse" + ) /* Assumption is Same group with different types and one missing expected types, In this case type-3 message is missing */ val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithSameCorrelationMessages, - arrayListOf("type-0", "type-1", "type-2", "type-3")) - assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, - "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType") + differentTypesWithSameCorrelationMessages, + arrayListOf("type-0", "type-1", "type-2", "type-3") + ) + assertTrue( + !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated, + "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType" + ) } @Test @@ -64,35 +74,48 @@ class MessageCorrelationUtilsTest { /** With ignoring Types */ /** Assumption is only one message received */ val withSameCorrelationOneMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1) val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationOneMessages, null) - assertTrue(!withSameCorrelationOneMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp") + withSameCorrelationOneMessages, null + ) + assertTrue( + !withSameCorrelationOneMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) /** Assumption is two message received for same group with same correlation */ val withSameCorrelationMessages = MessagePrioritizationSample - .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) + .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2) val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - withSameCorrelationMessages, null) - assertTrue(withSameCorrelationMessagesResp.correlated, - "failed to correlate withSameCorrelationMessagesResp") + withSameCorrelationMessages, null + ) + assertTrue( + withSameCorrelationMessagesResp.correlated, + "failed to correlate withSameCorrelationMessagesResp" + ) } @Test fun differentTypesWithDifferentCorrelationMessage() { /** Assumption is two message received for same group with different expected types and different correlation */ - val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-0", "key1=value1,key2=value2") - val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name, - "type-1", "key1=value1,key2=value3") + val message1 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-0", "key1=value1,key2=value2" + ) + val message2 = MessagePrioritizationSample.createMessage( + "sample-group", MessageState.NEW.name, + "type-1", "key1=value1,key2=value3" + ) val differentTypesWithDifferentCorrelationMessage: MutableList = arrayListOf() differentTypesWithDifferentCorrelationMessage.add(message1) differentTypesWithDifferentCorrelationMessage.add(message2) val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes( - differentTypesWithDifferentCorrelationMessage, - arrayListOf("type-0", "type-1")) - assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated, - "failed to correlate differentTypesWithDifferentCorrelationMessageResp") + differentTypesWithDifferentCorrelationMessage, + arrayListOf("type-0", "type-1") + ) + assertTrue( + !differentTypesWithDifferentCorrelationMessageResp.correlated, + "failed to correlate differentTypesWithDifferentCorrelationMessageResp" + ) } -} \ No newline at end of file +} -- cgit 1.2.3-korg