From 863de6175fa7912930e5ee7e90215f6f3e820f36 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Wed, 8 Jan 2020 12:36:42 -0500 Subject: Rest endpoint for message Prioritization Refactored to support both Kafka and Rest endpoint prioritization. Simplified number for Kafka processors. Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: Iba77ed94be3398940840ff01a298f0bec785401f --- .../functions/message-prioritizaion/README.md | 5 - .../AbstractMessagePrioritizeProcessor.kt | 51 ----- .../MessagePrioritizationConfiguration.kt | 3 - .../MessagePrioritizationConsumer.kt | 125 ------------ .../prioritization/MessagePrioritizationService.kt | 29 +++ .../MessagePrioritizationStateService.kt | 68 +++++++ .../prioritization/MessagePrioritizeExtensions.kt | 1 - .../prioritization/api/MessagePrioritizationApi.kt | 17 +- .../kafka/AbstractMessagePrioritizeProcessor.kt | 34 ++++ .../kafka/DefaultMessagePrioritizeProcessor.kt | 115 +++++++++++ .../kafka/MessagePrioritizationConsumer.kt | 99 ++++++++++ .../kafka/MessagePrioritizationPunctuators.kt | 69 +++++++ .../kafka/MessagePrioritizationSerde.kt | 64 +++++++ .../AbstractMessagePrioritizationService.kt | 170 +++++++++++++++++ .../service/MessagePrioritizationStateService.kt | 210 --------------------- .../MessagePrioritizationStateServiceImpl.kt | 162 ++++++++++++++++ .../service/SampleMessagePrioritizationService.kt | 46 +++++ .../topology/MessageAggregateProcessor.kt | 80 -------- .../topology/MessageOutputProcessor.kt | 34 ---- .../topology/MessagePrioritizationPunctuators.kt | 69 ------- .../topology/MessagePrioritizationSerde.kt | 64 ------- .../topology/MessagePrioritizeProcessor.kt | 174 ----------------- .../prioritization/utils/MessageProcessorUtils.kt | 22 +-- .../MessagePrioritizationConsumerTest.kt | 45 ++++- .../message/prioritization/TestConfiguration.kt | 32 ++-- 25 files changed, 938 insertions(+), 850 deletions(-) delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt create mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt delete mode 100644 ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion') diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md index 482bbc2cc..cda43faca 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -3,7 +3,6 @@ To Delete Topics ------------------ kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog Create Topics @@ -11,7 +10,6 @@ Create Topics kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic To List topics ---------------- @@ -26,6 +24,3 @@ To Listen for Output kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt deleted file mode 100644 index 35566abb4..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService - -/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ -abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { - - private val log = logger(AbstractMessagePrioritizeProcessor::class) - - lateinit var prioritizationConfiguration: PrioritizationConfiguration - lateinit var messagePrioritizationStateService: MessagePrioritizationStateService - var clusterService: BluePrintClusterService? = null - - override fun init(context: ProcessorContext) { - this.processorContext = context - /** Get the State service to update in store */ - this.messagePrioritizationStateService = BluePrintDependencyService - .messagePrioritizationStateService() - } - - /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ - open fun initializeClusterService() { - /** Get the Cluster service to update in store */ - if (BluePrintConstants.CLUSTER_ENABLED) { - this.clusterService = BluePrintDependencyService.clusterService() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt index 28e096352..890e0a6ba 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -28,9 +28,6 @@ object MessagePrioritizationConstants { const val SOURCE_INPUT = "source-prioritization-input" const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" - const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate" - const val PROCESSOR_OUTPUT = "processor-prioritization-output" const val SINK_OUTPUT = "sink-prioritization-output" - const val SINK_EXPIRED = "sink-prioritization-expired" } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt deleted file mode 100644 index b611060f7..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization - -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService -import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList - -open class MessagePrioritizationConsumer( - private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService -) { - - private val log = logger(MessagePrioritizationConsumer::class) - - lateinit var streamingConsumerService: BlueprintMessageConsumerService - - open fun consumerService(selector: String): BlueprintMessageConsumerService { - return bluePrintMessageLibPropertyService - .blueprintMessageConsumerService(selector) - } - - open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { - return object : KafkaStreamConsumerFunction { - - override suspend fun createTopology( - messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map? - ): Topology { - - val topology = Topology() - val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties - - val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() - log.info("Consuming prioritization topics($topics)") - - topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - prioritizationConfiguration - ), - MessagePrioritizationConstants.SOURCE_INPUT - ) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - prioritizationConfiguration - ), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_OUTPUT, - bluePrintProcessorSupplier( - MessagePrioritizationConstants.PROCESSOR_OUTPUT, - prioritizationConfiguration - ), - MessagePrioritizationConstants.PROCESSOR_AGGREGATE - ) - - topology.addSink( - MessagePrioritizationConstants.SINK_EXPIRED, - prioritizationConfiguration.expiredTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - /** To receive completed and error messages */ - topology.addSink( - MessagePrioritizationConstants.SINK_OUTPUT, - prioritizationConfiguration.outputTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - MessagePrioritizationConstants.PROCESSOR_OUTPUT - ) - - // Output will be sent to the group-output topic from Processor API - return topology - } - } - } - - suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { - streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector) - - // Dynamic Consumer Function to create Topology - val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) - streamingConsumerService.consume(null, consumerFunction) - } - - suspend fun shutDown() { - if (streamingConsumerService != null) { - streamingConsumerService.shutDown() - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt new file mode 100644 index 000000000..584fd00d3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -0,0 +1,29 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization + +interface MessagePrioritizationService { + + fun setKafkaProcessorContext(processorContext: ProcessorContext?) + + suspend fun prioritize(messagePrioritization: MessagePrioritization) + + suspend fun output(id: String) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..5dd41d7f3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt @@ -0,0 +1,68 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.Date + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getMessages(ids: List): List? + + suspend fun getExpiryEligibleMessages(count: Int): List? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? + + suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? + + suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? + + suspend fun updateMessagesState(ids: List, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesPriority(ids: List, priority: String) + + suspend fun setMessagesState(ids: List, state: String) + + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List) + + suspend fun deleteExpiredMessage(group: String, retentionDays: Int) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index bef7a7b61..05b820adb 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -17,7 +17,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt index 262dcb402..e90771fb8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -16,9 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.springframework.http.MediaType import org.springframework.web.bind.annotation.GetMapping @@ -31,7 +32,10 @@ import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping(value = ["/api/v1/message-prioritization"]) -open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) { +open class MessagePrioritizationApi( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) { @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody @@ -52,6 +56,15 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic messagePrioritizationStateService.saveMessage(messagePrioritization) } + @PostMapping( + path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { + messagePrioritizationService.prioritize(messagePrioritization) + } + @PostMapping( path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType.APPLICATION_JSON_VALUE] diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..656646ff7 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -0,0 +1,34 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ +abstract class AbstractMessagePrioritizeProcessor : AbstractBluePrintMessageProcessor() { + + private val log = logger(AbstractMessagePrioritizeProcessor::class) + + lateinit var prioritizationConfiguration: PrioritizationConfiguration + + override fun init(processorContext: ProcessorContext) { + this.processorContext = processorContext + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..c14a404ad --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -0,0 +1,115 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.Cancellable +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.time.Duration + +open class DefaultMessagePrioritizeProcessor( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) : AbstractMessagePrioritizeProcessor() { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + lateinit var expiryCancellable: Cancellable + lateinit var cleanCancellable: Cancellable + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + + val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + try { + messagePrioritizationService.setKafkaProcessorContext(processorContext) + messagePrioritizationService.prioritize(messagePrioritize) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + /** Publish to Output topic */ + this.processorContext.forward( + messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** set up expiry marking cron */ + initializeExpiryPunctuator() + /** Set up cleaning records cron */ + initializeCleanPunctuator() + } + + override fun close() { + log.info( + "closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + expiryCancellable.cancel() + cleanCancellable.cancel() + } + + open fun initializeExpiryPunctuator() { + val expiryPunctuator = + MessagePriorityExpiryPunctuator( + messagePrioritizationStateService + ) + expiryPunctuator.processorContext = processorContext + expiryPunctuator.configuration = prioritizationConfiguration + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + expiryCancellable = processorContext.schedule( + Duration.ofMillis(expiryConfiguration.frequencyMilli), + PunctuationType.WALL_CLOCK_TIME, expiryPunctuator + ) + log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") + } + + open fun initializeCleanPunctuator() { + val cleanPunctuator = + MessagePriorityCleanPunctuator( + messagePrioritizationStateService + ) + cleanPunctuator.processorContext = processorContext + cleanPunctuator.configuration = prioritizationConfiguration + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + cleanCancellable = processorContext.schedule( + Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), + PunctuationType.WALL_CLOCK_TIME, cleanPunctuator + ) + log.info( + "Clean punctuator setup complete with expiry " + + "hold(${cleanConfiguration.expiredRecordsHoldDays})days" + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt new file mode 100644 index 000000000..d7666a20b --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt @@ -0,0 +1,99 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList + +open class MessagePrioritizationConsumer( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService +) { + + private val log = logger(MessagePrioritizationConsumer::class) + + lateinit var streamingConsumerService: BlueprintMessageConsumerService + + open fun consumerService(selector: String): BlueprintMessageConsumerService { + return bluePrintMessageLibPropertyService + .blueprintMessageConsumerService(selector) + } + + open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): + KafkaStreamConsumerFunction { + return object : KafkaStreamConsumerFunction { + + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map? + ): Topology { + + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() + log.info("Consuming prioritization topics($topics)") + + topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray()) + + topology.addProcessor( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + bluePrintProcessorSupplier( + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, + prioritizationConfiguration + ), + MessagePrioritizationConstants.SOURCE_INPUT + ) + + /** To receive completed and error messages */ + topology.addSink( + MessagePrioritizationConstants.SINK_OUTPUT, + prioritizationConfiguration.outputTopic, + Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE + ) + + // Output will be sent to the group-output topic from Processor API + return topology + } + } + } + + suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) { + streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector) + + // Dynamic Consumer Function to create Topology + val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration) + streamingConsumerService.consume(null, consumerFunction) + } + + suspend fun shutDown() { + if (streamingConsumerService != null) { + streamingConsumerService.shutDown() + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt new file mode 100644 index 000000000..e27cf16d0 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt @@ -0,0 +1,69 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityExpiryPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + + log.info( + "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + val expiryConfiguration = configuration.expiryConfiguration + val fetchMessages = messagePrioritizationStateService + .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) + + val expiredIds = fetchMessages?.map { it.id } + if (expiredIds != null && expiredIds.isNotEmpty()) { + messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) + fetchMessages.forEach { expired -> + processorContext.forward( + expired.id, expired, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + } +} + +class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractBluePrintMessagePunctuator() { + + private val log = logger(MessagePriorityCleanPunctuator::class) + lateinit var configuration: PrioritizationConfiguration + + override suspend fun punctuateNB(timestamp: Long) { + log.info( + "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + // TODO + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt new file mode 100644 index 000000000..5595863d4 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt @@ -0,0 +1,64 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.nio.charset.Charset + +open class MessagePrioritizationSerde : Serde { + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer { + return object : Deserializer { + override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { + return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer { + return object : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt new file mode 100644 index 000000000..13c0dd7bc --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -0,0 +1,170 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ +abstract class AbstractMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : MessagePrioritizationService { + + private val log = logger(AbstractMessagePrioritizationService::class) + + var processorContext: ProcessorContext? = null + + override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { + this.processorContext = processorContext + } + + override suspend fun prioritize(messagePrioritize: MessagePrioritization) { + try { + log.info("***** received in prioritize processor key(${messagePrioritize.id})") + /** Get the cluster lock for message group */ + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) + // Save the Message + messagePrioritizationStateService.saveMessage(messagePrioritize) + handleCorrelationAndNextStep(messagePrioritize) + /** Cluster unLock for message group */ + MessageProcessorUtils.prioritizationGroupUnLock(clusterLock) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + } + } + + override suspend fun output(id: String) { + log.info("$$$$$ received in output processor id($id)") + val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name) + /** Check for Kafka Processing, If yes, then send to the output topic */ + if (this.processorContext != null) { + processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } + } + + open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { + /** Check correlation enabled and correlation field has populated */ + if (!messagePrioritization.correlationId.isNullOrBlank()) { + val id = messagePrioritization.id + val group = messagePrioritization.group + val correlationId = messagePrioritization.correlationId!! + val types = getGroupCorrelationTypes(messagePrioritization) + log.info( + "checking correlation for message($id), group($group), types($types), " + + "correlation id($correlationId)" + ) + + /** Get all previously received messages from database for group and optional types and correlation Id */ + val waitingCorrelatedStoreMessages = messagePrioritizationStateService + .getCorrelatedMessages( + group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId + ) + + /** If multiple records found, then check correlation */ + if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { + /** Check all correlation satisfies */ + val correlationResults = MessageCorrelationUtils + .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) + + if (correlationResults.correlated) { + /** Correlation satisfied */ + val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id } + /** Send only correlated ids to aggregate processor */ + aggregate(correlatedIds) + } else { + /** Correlation not satisfied */ + log.trace("correlation not matched : ${correlationResults.message}") + val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id } + // Update the Message state to Wait + messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name) + } + } else { + /** received first message of group and correlation Id, update the message with wait state */ + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) + } + } else { + // No Correlation check needed, simply forward to next processor. + messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) + aggregate(messagePrioritization.id) + } + } + + open suspend fun aggregate(strIds: String) { + log.info("@@@@@ received in aggregation processor ids($strIds)") + val ids = strIds.split(",").map { it.trim() } + if (!ids.isNullOrEmpty()) { + try { + if (ids.size == 1) { + /** No aggregation or sequencing needed, simpley forward to next processor */ + output(ids.first()) + } else { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(ids) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + } + } catch (e: Exception) { + val error = "failed in Aggregate message($ids) : ${e.message}" + log.error(error, e) + val storeMessages = messagePrioritizationStateService.getMessages(ids) + if (!storeMessages.isNullOrEmpty()) { + storeMessages.forEach { messagePrioritization -> + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) + /** Publish to output topic */ + output(messagePrioritization.id) + } catch (sendException: Exception) { + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e + ) + } + } + } + } + } + } + + /** Child will override this implementation , if necessary + * Here the place child has to implement custom Sequencing and Aggregation logic. + * */ + abstract suspend fun handleAggregation(messageIds: List) + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt deleted file mode 100644 index 017658ff6..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service - -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.springframework.data.domain.PageRequest -import org.springframework.stereotype.Service -import org.springframework.transaction.annotation.Transactional -import java.util.Date - -interface MessagePrioritizationStateService { - - suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization - - suspend fun getMessage(id: String): MessagePrioritization - - suspend fun getMessages(ids: List): List? - - suspend fun getExpiryEligibleMessages(count: Int): List? - - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): - List? - - suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): - List? - - suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List? - - suspend fun getCorrelatedMessages( - group: String, - states: List, - types: List?, - correlationIds: String - ): List? - - suspend fun updateMessagesState(ids: List, state: String) - - suspend fun updateMessageState(id: String, state: String): MessagePrioritization - - suspend fun setMessageState(id: String, state: String) - - suspend fun setMessagesPriority(ids: List, priority: String) - - suspend fun setMessagesState(ids: List, state: String) - - suspend fun setMessageStateANdError(id: String, state: String, error: String) - - suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) - - suspend fun deleteMessage(id: String) - - suspend fun deleteMessageByGroup(group: String) - - suspend fun deleteMessageStates(group: String, states: List) - - suspend fun deleteExpiredMessage(group: String, retentionDays: Int) -} - -@Service -open class MessagePrioritizationStateServiceImpl( - private val prioritizationMessageRepository: PrioritizationMessageRepository -) : - MessagePrioritizationStateService { - - private val log = logger(MessagePrioritizationStateServiceImpl::class) - - @Transactional - override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { - if (!message.correlationId.isNullOrBlank()) { - message.correlationId = message.toFormatedCorrelation() - } - message.updatedDate = Date() - return prioritizationMessageRepository.save(message) - } - - override suspend fun getMessage(id: String): MessagePrioritization { - return prioritizationMessageRepository.findById(id).orElseGet(null) - ?: throw BluePrintProcessorException("couldn't find message for id($id)") - } - - override suspend fun getMessages(ids: List): List? { - return prioritizationMessageRepository.findAllById(ids) - } - - override suspend fun getExpiryEligibleMessages(count: Int): List? { - return prioritizationMessageRepository - .findByStateInAndExpiredDate( - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), - Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): - List? { - return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): - List? { - return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( - group, - states, Date(), PageRequest.of(0, count) - ) - } - - override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): - List? { - return prioritizationMessageRepository.findByByGroupAndExpiredDate( - group, - expiryDate, PageRequest.of(0, count) - ) - } - - override suspend fun getCorrelatedMessages( - group: String, - states: List, - types: List?, - correlationIds: String - ): List? { - return if (!types.isNullOrEmpty()) { - prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) - } else { - prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) - } - } - - @Transactional - override suspend fun updateMessagesState(ids: List, state: String) { - ids.forEach { - val updated = updateMessageState(it, state) - log.info("message($it) update to state(${updated.state})") - } - } - - @Transactional - override suspend fun setMessageState(id: String, state: String) { - prioritizationMessageRepository.setStateForMessageId(id, state, Date()) - } - - @Transactional - override suspend fun setMessagesPriority(ids: List, priority: String) { - prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) - } - - @Transactional - override suspend fun setMessagesState(ids: List, state: String) { - prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) - } - - @Transactional - override suspend fun setMessageStateANdError(id: String, state: String, error: String) { - prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) - } - - @Transactional - override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { - val updateMessage = getMessage(id).apply { - this.updatedDate = Date() - this.state = state - } - return saveMessage(updateMessage) - } - - @Transactional - override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { - val groupedIds = aggregatedIds.joinToString(",") - prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) - } - - override suspend fun deleteMessage(id: String) { - return prioritizationMessageRepository.deleteById(id) - } - - override suspend fun deleteMessageByGroup(group: String) { - return prioritizationMessageRepository.deleteGroup(group) - } - - override suspend fun deleteMessageStates(group: String, states: List) { - return prioritizationMessageRepository.deleteGroupAndStateIn(group, states) - } - - override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { - return prioritizationMessageRepository.deleteGroupAndStateIn( - group, - arrayListOf(MessageState.EXPIRED.name) - ) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt new file mode 100644 index 000000000..d9cd956bf --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -0,0 +1,162 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.data.domain.PageRequest +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.Date + +@Service +open class MessagePrioritizationStateServiceImpl( + private val prioritizationMessageRepository: PrioritizationMessageRepository +) : MessagePrioritizationStateService { + + private val log = logger(MessagePrioritizationStateServiceImpl::class) + + @Transactional + override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization { + if (!message.correlationId.isNullOrBlank()) { + message.correlationId = message.toFormatedCorrelation() + } + message.updatedDate = Date() + return prioritizationMessageRepository.save(message) + } + + override suspend fun getMessage(id: String): MessagePrioritization { + return prioritizationMessageRepository.findById(id).orElseGet(null) + ?: throw BluePrintProcessorException("couldn't find message for id($id)") + } + + override suspend fun getMessages(ids: List): List? { + return prioritizationMessageRepository.findAllById(ids) + } + + override suspend fun getExpiryEligibleMessages(count: Int): List? { + return prioritizationMessageRepository + .findByStateInAndExpiredDate( + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), + Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getMessageForStatesExpired(group: String, states: List, count: Int): + List? { + return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( + group, + states, Date(), PageRequest.of(0, count) + ) + } + + override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): + List? { + return prioritizationMessageRepository.findByByGroupAndExpiredDate( + group, + expiryDate, PageRequest.of(0, count) + ) + } + + override suspend fun getCorrelatedMessages( + group: String, + states: List, + types: List?, + correlationIds: String + ): List? { + return if (!types.isNullOrEmpty()) { + prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds) + } else { + prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds) + } + } + + @Transactional + override suspend fun updateMessagesState(ids: List, state: String) { + ids.forEach { + val updated = updateMessageState(it, state) + log.info("message($it) update to state(${updated.state})") + } + } + + @Transactional + override suspend fun setMessageState(id: String, state: String) { + prioritizationMessageRepository.setStateForMessageId(id, state, Date()) + } + + @Transactional + override suspend fun setMessagesPriority(ids: List, priority: String) { + prioritizationMessageRepository.setPriorityForMessageIds(ids, priority, Date()) + } + + @Transactional + override suspend fun setMessagesState(ids: List, state: String) { + prioritizationMessageRepository.setStateForMessageIds(ids, state, Date()) + } + + @Transactional + override suspend fun setMessageStateANdError(id: String, state: String, error: String) { + prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date()) + } + + @Transactional + override suspend fun updateMessageState(id: String, state: String): MessagePrioritization { + val updateMessage = getMessage(id).apply { + this.updatedDate = Date() + this.state = state + } + return saveMessage(updateMessage) + } + + @Transactional + override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List) { + val groupedIds = aggregatedIds.joinToString(",") + prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date()) + } + + override suspend fun deleteMessage(id: String) { + return prioritizationMessageRepository.deleteById(id) + } + + override suspend fun deleteMessageByGroup(group: String) { + return prioritizationMessageRepository.deleteGroup(group) + } + + override suspend fun deleteMessageStates(group: String, states: List) { + return prioritizationMessageRepository.deleteGroupAndStateIn(group, states) + } + + override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) { + return prioritizationMessageRepository.deleteGroupAndStateIn( + group, + arrayListOf(MessageState.EXPIRED.name) + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt new file mode 100644 index 000000000..fcdb71cda --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -0,0 +1,46 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messageIds: List) { + log.info("messages($messageIds) aggregated") + messageIds.forEach { id -> + output(id) + } + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { + return when (messagePrioritization.group) { + /** Dummy Implementation, This can also be read from file and stored as cached map **/ + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + else -> null + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt deleted file mode 100644 index 3e697e633..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor() { - - private val log = logger(MessageAggregateProcessor::class) - - override suspend fun processNB(key: String, value: String) { - - log.info("@@@@@ received in aggregation processor key($key), value($value)") - val ids = value.split(",").map { it.trim() } - if (!ids.isNullOrEmpty()) { - try { - if (ids.size == 1) { - processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) - } else { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(ids) - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) - } - } catch (e: Exception) { - val error = "failed in Aggregate message($ids) : ${e.message}" - log.error(error, e) - val storeMessages = messagePrioritizationStateService.getMessages(ids) - if (!storeMessages.isNullOrEmpty()) { - storeMessages.forEach { messagePrioritization -> - try { - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritization.id, - MessageState.ERROR.name, error - ) - /** Publish to Error topic */ - this.processorContext.forward( - messagePrioritization.id, messagePrioritization, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } catch (sendException: Exception) { - log.error( - "failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", e - ) - } - } - } - } - } - } - - /** Child will override this implementation , if necessary */ - open suspend fun handleAggregation(messageIds: List) { - log.info("messages($messageIds) aggregated") - messageIds.forEach { id -> - processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt deleted file mode 100644 index cf6520df5..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor() { - - private val log = logger(MessageOutputProcessor::class) - - override suspend fun processNB(key: String, value: String) { - log.info("$$$$$ received in output processor key($key), value($value)") - val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name) - processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt deleted file mode 100644 index 5435ebe30..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService -import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractBluePrintMessagePunctuator() { - - private val log = logger(MessagePriorityExpiryPunctuator::class) - lateinit var configuration: PrioritizationConfiguration - - override suspend fun punctuateNB(timestamp: Long) { - - log.info( - "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - val expiryConfiguration = configuration.expiryConfiguration - val fetchMessages = messagePrioritizationStateService - .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord) - - val expiredIds = fetchMessages?.map { it.id } - if (expiredIds != null && expiredIds.isNotEmpty()) { - messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name) - fetchMessages.forEach { expired -> - processorContext.forward( - expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_EXPIRED) - ) - } - } - } -} - -class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) : - AbstractBluePrintMessagePunctuator() { - - private val log = logger(MessagePriorityCleanPunctuator::class) - lateinit var configuration: PrioritizationConfiguration - - override suspend fun punctuateNB(timestamp: Long) { - log.info( - "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - // TODO - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt deleted file mode 100644 index f2a481f74..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.common.serialization.Deserializer -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.common.serialization.Serializer -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.nio.charset.Charset - -open class MessagePrioritizationSerde : Serde { - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - - override fun deserializer(): Deserializer { - return object : Deserializer { - override fun deserialize(topic: String, data: ByteArray): MessagePrioritization { - return JacksonUtils.readValue(String(data), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - } - - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun close() { - } - } - } - - override fun serializer(): Serializer { - return object : Serializer { - override fun configure(configs: MutableMap?, isKey: Boolean) { - } - - override fun serialize(topic: String?, data: MessagePrioritization): ByteArray { - return data.asJsonString().toByteArray(Charset.defaultCharset()) - } - - override fun close() { - } - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt deleted file mode 100644 index 4e4e2da7a..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.Cancellable -import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.PunctuationType -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.time.Duration -import java.util.UUID - -open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor() { - - private val log = logger(MessagePrioritizeProcessor::class) - - lateinit var expiryCancellable: Cancellable - lateinit var cleanCancellable: Cancellable - - override suspend fun processNB(key: ByteArray, value: ByteArray) { - log.info("***** received in prioritize processor key(${String(key)})") - val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") - try { - /** Get the cluster lock for message group */ - val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize) - // Save the Message - messagePrioritizationStateService.saveMessage(messagePrioritize) - handleCorrelationAndNextStep(messagePrioritize) - /** Cluster unLock for message group */ - MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock) - } catch (e: Exception) { - messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" - log.error(messagePrioritize.error) - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritize.id, MessageState.ERROR.name, - messagePrioritize.error!! - ) - /** Publish to Output topic */ - this.processorContext.forward( - messagePrioritize.id, messagePrioritize, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } - } - - override fun init(context: ProcessorContext) { - super.init(context) - /** set up expiry marking cron */ - initializeExpiryPunctuator() - /** Set up cleaning records cron */ - initializeCleanPunctuator() - /** Set up Cluster Service */ - initializeClusterService() - } - - override fun close() { - log.info( - "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - expiryCancellable.cancel() - cleanCancellable.cancel() - } - - open fun initializeExpiryPunctuator() { - val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService) - expiryPunctuator.processorContext = processorContext - expiryPunctuator.configuration = prioritizationConfiguration - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - expiryCancellable = processorContext.schedule( - Duration.ofMillis(expiryConfiguration.frequencyMilli), - PunctuationType.WALL_CLOCK_TIME, expiryPunctuator - ) - log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") - } - - open fun initializeCleanPunctuator() { - val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService) - cleanPunctuator.processorContext = processorContext - cleanPunctuator.configuration = prioritizationConfiguration - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - cleanCancellable = processorContext.schedule( - Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), - PunctuationType.WALL_CLOCK_TIME, cleanPunctuator - ) - log.info( - "Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days" - ) - } - - open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { - /** Check correlation enabled and correlation field has populated */ - if (!messagePrioritization.correlationId.isNullOrBlank()) { - val id = messagePrioritization.id - val group = messagePrioritization.group - val correlationId = messagePrioritization.correlationId!! - val types = getGroupCorrelationTypes(messagePrioritization) - log.info( - "checking correlation for message($id), group($group), types($types), " + - "correlation id($correlationId)" - ) - - /** Get all previously received messages from database for group and optional types and correlation Id */ - val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages( - group, - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId - ) - - /** If multiple records found, then check correlation */ - if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { - /** Check all correlation satisfies */ - val correlationResults = MessageCorrelationUtils - .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types) - - if (correlationResults.correlated) { - /** Correlation satisfied */ - val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") - /** Send only correlated ids to next processor */ - this.processorContext.forward( - UUID.randomUUID().toString(), correlatedIds, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - ) - } else { - /** Correlation not satisfied */ - log.trace("correlation not matched : ${correlationResults.message}") - val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id } - // Update the Message state to Wait - messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name) - } - } else { - /** received first message of group and correlation Id, update the message with wait state */ - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name) - } - } else { - // No Correlation check needed, simply forward to next processor. - messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - this.processorContext.forward( - messagePrioritization.id, messagePrioritization.id, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - ) - } - } - - /** If consumer wants specific correlation with respect to group and types, then populate the specific types, - * otherwise correlation happens with group and correlationId */ - open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - return null - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 49230b6e4..186499d66 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -17,28 +17,27 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils import org.apache.kafka.streams.processor.ProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService object MessageProcessorUtils { - /** Utility to create the cluster lock for message [messagePrioritization] */ - suspend fun prioritizationGrouplock( - clusterService: BluePrintClusterService?, - messagePrioritization: MessagePrioritization - ): ClusterLock? { + /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ + suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined() && !messagePrioritization.correlationId.isNullOrBlank() ) { // Get the correlation key in ascending order, even it it is misplaced val correlationId = messagePrioritization.toFormatedCorrelation() - val lockName = "prioritization-${messagePrioritization.group}-$correlationId" + val lockName = "prioritize::${messagePrioritization.group}::$correlationId" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") @@ -46,14 +45,15 @@ object MessageProcessorUtils { } else null } - /** Utility used to cluster unlock for message [messagePrioritization] */ - suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) { - if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) { + /** Utility used to cluster unlock for message [clusterLock] */ + suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) { + if (clusterLock != null) { clusterLock.unLock() clusterLock.close() } } + /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ fun bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): ProcessorSupplier { return ProcessorSupplier { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index f9e23e826..ec0515c42 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -27,12 +27,13 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest @@ -43,6 +44,7 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull +import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DataJpaTest @@ -72,6 +74,8 @@ import kotlin.test.assertNotNull ) open class MessagePrioritizationConsumerTest { + private val log = logger(MessagePrioritizationConsumerTest::class) + @Autowired lateinit var applicationContext: ApplicationContext @@ -81,6 +85,9 @@ open class MessagePrioritizationConsumerTest { @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + @Autowired + lateinit var messagePrioritizationService: MessagePrioritizationService + @Autowired lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer @@ -106,6 +113,38 @@ open class MessagePrioritizationConsumerTest { } } + @Test + fun testMessagePrioritizationService() { + runBlocking { + assertTrue( + ::messagePrioritizationService.isInitialized, + "failed to initialize messagePrioritizationService" + ) + + log.info("**************** without Correlation **************") + /** Checking without correlation */ + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + messagePrioritizationService.prioritize(it) + } + log.info("**************** Same Group , with Correlation **************") + /** checking same group with correlation */ + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + log.info("**************** Different Type , with Correlation **************") + /** checking different type, with correlation */ + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + } + } + @Test fun testStartConsuming() { runBlocking { @@ -118,7 +157,9 @@ open class MessagePrioritizationConsumerTest { val spyStreamingConsumerService = spyk(streamingConsumerService) coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + val messagePrioritizationConsumer = MessagePrioritizationConsumer( + bluePrintMessageLibPropertyService + ) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 3d3d0c6f5..0285079ad 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -17,10 +17,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean @@ -65,22 +64,17 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio */ @Service -open class SampleMessagePrioritizationConsumer( +open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + SampleMessagePrioritizationService(messagePrioritizationStateService) + +/** For Kafka Consumer **/ +@Service +open class TestMessagePrioritizationConsumer( bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService ) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) @Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List? { - return when (messagePrioritization.group) { - "group-typed" -> arrayListOf("type-0", "type-1", "type-2") - else -> null - } - } -} - -@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() - -@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class SampleMessageOutputProcessor : MessageOutputProcessor() +open class TestMessagePrioritizeProcessor( + messagePrioritizationStateService: MessagePrioritizationStateService, + messagePrioritizationService: MessagePrioritizationService +) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService) -- cgit 1.2.3-korg