diff options
author | Brinda Santh <bs2796@att.com> | 2020-01-08 12:36:42 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2020-01-08 12:36:42 -0500 |
commit | 863de6175fa7912930e5ee7e90215f6f3e820f36 (patch) | |
tree | ce2d4d2c3f0012c3a25daa767aa6768bdb99fe31 /ms/blueprintsprocessor/functions/message-prioritizaion | |
parent | 0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 (diff) |
Rest endpoint for message Prioritization
Refactored to support both Kafka and Rest endpoint prioritization.
Simplified number for Kafka processors.
Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Iba77ed94be3398940840ff01a298f0bec785401f
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion')
19 files changed, 440 insertions, 352 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md index 482bbc2cc..cda43faca 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md @@ -3,7 +3,6 @@ To Delete Topics ------------------ kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic -kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog Create Topics @@ -11,7 +10,6 @@ Create Topics kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic -kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic To List topics ---------------- @@ -26,6 +24,3 @@ To Listen for Output kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning - -kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning - diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt index 28e096352..890e0a6ba 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt @@ -28,9 +28,6 @@ object MessagePrioritizationConstants { const val SOURCE_INPUT = "source-prioritization-input" const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize" - const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate" - const val PROCESSOR_OUTPUT = "processor-prioritization-output" const val SINK_OUTPUT = "sink-prioritization-output" - const val SINK_EXPIRED = "sink-prioritization-expired" } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt new file mode 100644 index 000000000..584fd00d3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt @@ -0,0 +1,29 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.apache.kafka.streams.processor.ProcessorContext +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization + +interface MessagePrioritizationService { + + fun setKafkaProcessorContext(processorContext: ProcessorContext?) + + suspend fun prioritize(messagePrioritization: MessagePrioritization) + + suspend fun output(id: String) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt new file mode 100644 index 000000000..5dd41d7f3 --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt @@ -0,0 +1,68 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import java.util.Date + +interface MessagePrioritizationStateService { + + suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization + + suspend fun getMessage(id: String): MessagePrioritization + + suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? + + suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? + + suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? + + suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): + List<MessagePrioritization>? + + suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? + + suspend fun getCorrelatedMessages( + group: String, + states: List<String>, + types: List<String>?, + correlationIds: String + ): List<MessagePrioritization>? + + suspend fun updateMessagesState(ids: List<String>, state: String) + + suspend fun updateMessageState(id: String, state: String): MessagePrioritization + + suspend fun setMessageState(id: String, state: String) + + suspend fun setMessagesPriority(ids: List<String>, priority: String) + + suspend fun setMessagesState(ids: List<String>, state: String) + + suspend fun setMessageStateANdError(id: String, state: String, error: String) + + suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) + + suspend fun deleteMessage(id: String) + + suspend fun deleteMessageByGroup(group: String) + + suspend fun deleteMessageStates(group: String, states: List<String>) + + suspend fun deleteExpiredMessage(group: String, retentionDays: Int) +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index bef7a7b61..05b820adb 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -17,7 +17,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt index 262dcb402..e90771fb8 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt @@ -16,9 +16,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc import org.springframework.http.MediaType import org.springframework.web.bind.annotation.GetMapping @@ -31,7 +32,10 @@ import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping(value = ["/api/v1/message-prioritization"]) -open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) { +open class MessagePrioritizationApi( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) { @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody @@ -53,6 +57,15 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic } @PostMapping( + path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE], + consumes = [MediaType.APPLICATION_JSON_VALUE] + ) + @ResponseBody + fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc { + messagePrioritizationService.prioritize(messagePrioritization) + } + + @PostMapping( path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType.APPLICATION_JSON_VALUE] ) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt index 35566abb4..656646ff7 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt @@ -14,38 +14,21 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.ProcessorContext -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */ +/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() { private val log = logger(AbstractMessagePrioritizeProcessor::class) lateinit var prioritizationConfiguration: PrioritizationConfiguration - lateinit var messagePrioritizationStateService: MessagePrioritizationStateService - var clusterService: BluePrintClusterService? = null - override fun init(context: ProcessorContext) { - this.processorContext = context - /** Get the State service to update in store */ - this.messagePrioritizationStateService = BluePrintDependencyService - .messagePrioritizationStateService() - } - - /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */ - open fun initializeClusterService() { - /** Get the Cluster service to update in store */ - if (BluePrintConstants.CLUSTER_ENABLED) { - this.clusterService = BluePrintDependencyService.clusterService() - } + override fun init(processorContext: ProcessorContext) { + this.processorContext = processorContext } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt new file mode 100644 index 000000000..c14a404ad --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt @@ -0,0 +1,115 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka + +import org.apache.kafka.streams.processor.Cancellable +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.PunctuationType +import org.apache.kafka.streams.processor.To +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.time.Duration + +open class DefaultMessagePrioritizeProcessor( + private val messagePrioritizationStateService: MessagePrioritizationStateService, + private val messagePrioritizationService: MessagePrioritizationService +) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + lateinit var expiryCancellable: Cancellable + lateinit var cleanCancellable: Cancellable + + override suspend fun processNB(key: ByteArray, value: ByteArray) { + + val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) + ?: throw BluePrintProcessorException("failed to convert") + try { + messagePrioritizationService.setKafkaProcessorContext(processorContext) + messagePrioritizationService.prioritize(messagePrioritize) + } catch (e: Exception) { + messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" + log.error(messagePrioritize.error) + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritize.id, MessageState.ERROR.name, + messagePrioritize.error!! + ) + /** Publish to Output topic */ + this.processorContext.forward( + messagePrioritize.id, messagePrioritize, + To.child(MessagePrioritizationConstants.SINK_OUTPUT) + ) + } + } + + override fun init(context: ProcessorContext) { + super.init(context) + /** set up expiry marking cron */ + initializeExpiryPunctuator() + /** Set up cleaning records cron */ + initializeCleanPunctuator() + } + + override fun close() { + log.info( + "closing prioritization processor applicationId(${processorContext.applicationId()}), " + + "taskId(${processorContext.taskId()})" + ) + expiryCancellable.cancel() + cleanCancellable.cancel() + } + + open fun initializeExpiryPunctuator() { + val expiryPunctuator = + MessagePriorityExpiryPunctuator( + messagePrioritizationStateService + ) + expiryPunctuator.processorContext = processorContext + expiryPunctuator.configuration = prioritizationConfiguration + val expiryConfiguration = prioritizationConfiguration.expiryConfiguration + expiryCancellable = processorContext.schedule( + Duration.ofMillis(expiryConfiguration.frequencyMilli), + PunctuationType.WALL_CLOCK_TIME, expiryPunctuator + ) + log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") + } + + open fun initializeCleanPunctuator() { + val cleanPunctuator = + MessagePriorityCleanPunctuator( + messagePrioritizationStateService + ) + cleanPunctuator.processorContext = processorContext + cleanPunctuator.configuration = prioritizationConfiguration + val cleanConfiguration = prioritizationConfiguration.cleanConfiguration + cleanCancellable = processorContext.schedule( + Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), + PunctuationType.WALL_CLOCK_TIME, cleanPunctuator + ) + log.info( + "Clean punctuator setup complete with expiry " + + "hold(${cleanConfiguration.expiredRecordsHoldDays})days" + ) + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt index b611060f7..d7666a20b 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt @@ -14,11 +14,12 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.Topology -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties @@ -42,7 +43,7 @@ open class MessagePrioritizationConsumer( } open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration): - KafkaStreamConsumerFunction { + KafkaStreamConsumerFunction { return object : KafkaStreamConsumerFunction { override suspend fun createTopology( @@ -52,7 +53,7 @@ open class MessagePrioritizationConsumer( val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties - as KafkaStreamsBasicAuthConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList() log.info("Consuming prioritization topics($topics)") @@ -68,39 +69,12 @@ open class MessagePrioritizationConsumer( MessagePrioritizationConstants.SOURCE_INPUT ) - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - bluePrintProcessorSupplier<String, String>( - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - prioritizationConfiguration - ), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - - topology.addProcessor( - MessagePrioritizationConstants.PROCESSOR_OUTPUT, - bluePrintProcessorSupplier<String, String>( - MessagePrioritizationConstants.PROCESSOR_OUTPUT, - prioritizationConfiguration - ), - MessagePrioritizationConstants.PROCESSOR_AGGREGATE - ) - - topology.addSink( - MessagePrioritizationConstants.SINK_EXPIRED, - prioritizationConfiguration.expiredTopic, - Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE - ) - /** To receive completed and error messages */ topology.addSink( MessagePrioritizationConstants.SINK_OUTPUT, prioritizationConfiguration.outputTopic, Serdes.String().serializer(), MessagePrioritizationSerde().serializer(), - MessagePrioritizationConstants.PROCESSOR_PRIORITIZE, - MessagePrioritizationConstants.PROCESSOR_AGGREGATE, - MessagePrioritizationConstants.PROCESSOR_OUTPUT + MessagePrioritizationConstants.PROCESSOR_PRIORITIZE ) // Output will be sent to the group-output topic from Processor API diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt index 5435ebe30..e27cf16d0 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt @@ -14,13 +14,13 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.streams.processor.To import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -46,7 +46,7 @@ class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateServ fetchMessages.forEach { expired -> processorContext.forward( expired.id, expired, - To.child(MessagePrioritizationConstants.SINK_EXPIRED) + To.child(MessagePrioritizationConstants.SINK_OUTPUT) ) } } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt index f2a481f74..5595863d4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.serialization.Serde diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt index 4e4e2da7a..13c0dd7bc 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt @@ -14,43 +14,42 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service -import org.apache.kafka.streams.processor.Cancellable import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.processor.PunctuationType import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils -import java.time.Duration -import java.util.UUID -open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() { +/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/ +abstract class AbstractMessagePrioritizationService( + private val messagePrioritizationStateService: MessagePrioritizationStateService +) : MessagePrioritizationService { - private val log = logger(MessagePrioritizeProcessor::class) + private val log = logger(AbstractMessagePrioritizationService::class) - lateinit var expiryCancellable: Cancellable - lateinit var cleanCancellable: Cancellable + var processorContext: ProcessorContext? = null - override suspend fun processNB(key: ByteArray, value: ByteArray) { - log.info("***** received in prioritize processor key(${String(key)})") - val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java) - ?: throw BluePrintProcessorException("failed to convert") + override fun setKafkaProcessorContext(processorContext: ProcessorContext?) { + this.processorContext = processorContext + } + + override suspend fun prioritize(messagePrioritize: MessagePrioritization) { try { + log.info("***** received in prioritize processor key(${messagePrioritize.id})") /** Get the cluster lock for message group */ - val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize) + val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize) // Save the Message messagePrioritizationStateService.saveMessage(messagePrioritize) handleCorrelationAndNextStep(messagePrioritize) /** Cluster unLock for message group */ - MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock) + MessageProcessorUtils.prioritizationGroupUnLock(clusterLock) } catch (e: Exception) { messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}" log.error(messagePrioritize.error) @@ -59,58 +58,16 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA messagePrioritize.id, MessageState.ERROR.name, messagePrioritize.error!! ) - /** Publish to Output topic */ - this.processorContext.forward( - messagePrioritize.id, messagePrioritize, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) } } - override fun init(context: ProcessorContext) { - super.init(context) - /** set up expiry marking cron */ - initializeExpiryPunctuator() - /** Set up cleaning records cron */ - initializeCleanPunctuator() - /** Set up Cluster Service */ - initializeClusterService() - } - - override fun close() { - log.info( - "closing prioritization processor applicationId(${processorContext.applicationId()}), " + - "taskId(${processorContext.taskId()})" - ) - expiryCancellable.cancel() - cleanCancellable.cancel() - } - - open fun initializeExpiryPunctuator() { - val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService) - expiryPunctuator.processorContext = processorContext - expiryPunctuator.configuration = prioritizationConfiguration - val expiryConfiguration = prioritizationConfiguration.expiryConfiguration - expiryCancellable = processorContext.schedule( - Duration.ofMillis(expiryConfiguration.frequencyMilli), - PunctuationType.WALL_CLOCK_TIME, expiryPunctuator - ) - log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec") - } - - open fun initializeCleanPunctuator() { - val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService) - cleanPunctuator.processorContext = processorContext - cleanPunctuator.configuration = prioritizationConfiguration - val cleanConfiguration = prioritizationConfiguration.cleanConfiguration - cleanCancellable = processorContext.schedule( - Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()), - PunctuationType.WALL_CLOCK_TIME, cleanPunctuator - ) - log.info( - "Clean punctuator setup complete with expiry " + - "hold(${cleanConfiguration.expiredRecordsHoldDays})days" - ) + override suspend fun output(id: String) { + log.info("$$$$$ received in output processor id($id)") + val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name) + /** Check for Kafka Processing, If yes, then send to the output topic */ + if (this.processorContext != null) { + processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) + } } open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) { @@ -126,10 +83,11 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA ) /** Get all previously received messages from database for group and optional types and correlation Id */ - val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages( - group, - arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId - ) + val waitingCorrelatedStoreMessages = messagePrioritizationStateService + .getCorrelatedMessages( + group, + arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId + ) /** If multiple records found, then check correlation */ if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) { @@ -139,12 +97,9 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA if (correlationResults.correlated) { /** Correlation satisfied */ - val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",") - /** Send only correlated ids to next processor */ - this.processorContext.forward( - UUID.randomUUID().toString(), correlatedIds, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - ) + val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id } + /** Send only correlated ids to aggregate processor */ + aggregate(correlatedIds) } else { /** Correlation not satisfied */ log.trace("correlation not matched : ${correlationResults.message}") @@ -159,16 +114,57 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA } else { // No Correlation check needed, simply forward to next processor. messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name) - this.processorContext.forward( - messagePrioritization.id, messagePrioritization.id, - To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) - ) + aggregate(messagePrioritization.id) + } + } + + open suspend fun aggregate(strIds: String) { + log.info("@@@@@ received in aggregation processor ids($strIds)") + val ids = strIds.split(",").map { it.trim() } + if (!ids.isNullOrEmpty()) { + try { + if (ids.size == 1) { + /** No aggregation or sequencing needed, simpley forward to next processor */ + output(ids.first()) + } else { + /** Implement Aggregation logic in overridden class, If necessary, + Populate New Message and Update status with Prioritized, Forward the message to next processor */ + handleAggregation(ids) + /** Update all messages to Aggregated state */ + messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) + } + } catch (e: Exception) { + val error = "failed in Aggregate message($ids) : ${e.message}" + log.error(error, e) + val storeMessages = messagePrioritizationStateService.getMessages(ids) + if (!storeMessages.isNullOrEmpty()) { + storeMessages.forEach { messagePrioritization -> + try { + /** Update the data store */ + messagePrioritizationStateService.setMessageStateANdError( + messagePrioritization.id, + MessageState.ERROR.name, error + ) + /** Publish to output topic */ + output(messagePrioritization.id) + } catch (sendException: Exception) { + log.error( + "failed to update/publish error message(${messagePrioritization.id}) : " + + "${sendException.message}", e + ) + } + } + } + } } } + /** Child will override this implementation , if necessary + * Here the place child has to implement custom Sequencing and Aggregation logic. + * */ + abstract suspend fun handleAggregation(messageIds: List<String>) + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, * otherwise correlation happens with group and correlationId */ - open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - return null - } + abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt index 017658ff6..d9cd956bf 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt @@ -16,6 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository @@ -27,59 +28,10 @@ import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional import java.util.Date -interface MessagePrioritizationStateService { - - suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization - - suspend fun getMessage(id: String): MessagePrioritization - - suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? - - suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? - - suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? - - suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? - - suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>? - - suspend fun getCorrelatedMessages( - group: String, - states: List<String>, - types: List<String>?, - correlationIds: String - ): List<MessagePrioritization>? - - suspend fun updateMessagesState(ids: List<String>, state: String) - - suspend fun updateMessageState(id: String, state: String): MessagePrioritization - - suspend fun setMessageState(id: String, state: String) - - suspend fun setMessagesPriority(ids: List<String>, priority: String) - - suspend fun setMessagesState(ids: List<String>, state: String) - - suspend fun setMessageStateANdError(id: String, state: String, error: String) - - suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) - - suspend fun deleteMessage(id: String) - - suspend fun deleteMessageByGroup(group: String) - - suspend fun deleteMessageStates(group: String, states: List<String>) - - suspend fun deleteExpiredMessage(group: String, retentionDays: Int) -} - @Service open class MessagePrioritizationStateServiceImpl( private val prioritizationMessageRepository: PrioritizationMessageRepository -) : - MessagePrioritizationStateService { +) : MessagePrioritizationStateService { private val log = logger(MessagePrioritizationStateServiceImpl::class) @@ -110,7 +62,7 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? { + List<MessagePrioritization>? { return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate( group, states, Date(), PageRequest.of(0, count) @@ -118,7 +70,7 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): - List<MessagePrioritization>? { + List<MessagePrioritization>? { return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate( group, states, Date(), PageRequest.of(0, count) @@ -126,7 +78,7 @@ open class MessagePrioritizationStateServiceImpl( } override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): - List<MessagePrioritization>? { + List<MessagePrioritization>? { return prioritizationMessageRepository.findByByGroupAndExpiredDate( group, expiryDate, PageRequest.of(0, count) diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt new file mode 100644 index 000000000..fcdb71cda --- /dev/null +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt @@ -0,0 +1,46 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service + +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + AbstractMessagePrioritizationService(messagePrioritizationStateService) { + + private val log = logger(DefaultMessagePrioritizeProcessor::class) + + /** Child overriding this implementation , if necessary */ + override suspend fun handleAggregation(messageIds: List<String>) { + log.info("messages($messageIds) aggregated") + messageIds.forEach { id -> + output(id) + } + } + + /** If consumer wants specific correlation with respect to group and types, then populate the specific types, + * otherwise correlation happens with group and correlationId */ + override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { + return when (messagePrioritization.group) { + /** Dummy Implementation, This can also be read from file and stored as cached map **/ + "group-typed" -> arrayListOf("type-0", "type-1", "type-2") + else -> null + } + } +} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt deleted file mode 100644 index 3e697e633..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() { - - private val log = logger(MessageAggregateProcessor::class) - - override suspend fun processNB(key: String, value: String) { - - log.info("@@@@@ received in aggregation processor key($key), value($value)") - val ids = value.split(",").map { it.trim() } - if (!ids.isNullOrEmpty()) { - try { - if (ids.size == 1) { - processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) - } else { - /** Implement Aggregation logic in overridden class, If necessary, - Populate New Message and Update status with Prioritized, Forward the message to next processor */ - handleAggregation(ids) - /** Update all messages to Aggregated state */ - messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name) - } - } catch (e: Exception) { - val error = "failed in Aggregate message($ids) : ${e.message}" - log.error(error, e) - val storeMessages = messagePrioritizationStateService.getMessages(ids) - if (!storeMessages.isNullOrEmpty()) { - storeMessages.forEach { messagePrioritization -> - try { - /** Update the data store */ - messagePrioritizationStateService.setMessageStateANdError( - messagePrioritization.id, - MessageState.ERROR.name, error - ) - /** Publish to Error topic */ - this.processorContext.forward( - messagePrioritization.id, messagePrioritization, - To.child(MessagePrioritizationConstants.SINK_OUTPUT) - ) - } catch (sendException: Exception) { - log.error( - "failed to update/publish error message(${messagePrioritization.id}) : " + - "${sendException.message}", e - ) - } - } - } - } - } - } - - /** Child will override this implementation , if necessary */ - open suspend fun handleAggregation(messageIds: List<String>) { - log.info("messages($messageIds) aggregated") - messageIds.forEach { id -> - processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT)) - } - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt deleted file mode 100644 index cf6520df5..000000000 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology - -import org.apache.kafka.streams.processor.To -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState -import org.onap.ccsdk.cds.controllerblueprints.core.logger - -open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() { - - private val log = logger(MessageOutputProcessor::class) - - override suspend fun processNB(key: String, value: String) { - log.info("$$$$$ received in output processor key($key), value($value)") - val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name) - processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT)) - } -} diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index 49230b6e4..186499d66 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -17,28 +17,27 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils import org.apache.kafka.streams.processor.ProcessorSupplier -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService object MessageProcessorUtils { - /** Utility to create the cluster lock for message [messagePrioritization] */ - suspend fun prioritizationGrouplock( - clusterService: BluePrintClusterService?, - messagePrioritization: MessagePrioritization - ): ClusterLock? { + /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/ + suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? { + val clusterService = BluePrintDependencyService.optionalClusterService() + return if (clusterService != null && clusterService.clusterJoined() && !messagePrioritization.correlationId.isNullOrBlank() ) { // Get the correlation key in ascending order, even it it is misplaced val correlationId = messagePrioritization.toFormatedCorrelation() - val lockName = "prioritization-${messagePrioritization.group}-$correlationId" + val lockName = "prioritize::${messagePrioritization.group}::$correlationId" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") @@ -46,14 +45,15 @@ object MessageProcessorUtils { } else null } - /** Utility used to cluster unlock for message [messagePrioritization] */ - suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) { - if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) { + /** Utility used to cluster unlock for message [clusterLock] */ + suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) { + if (clusterLock != null) { clusterLock.unLock() clusterLock.close() } } + /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/ fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration): ProcessorSupplier<K, V> { return ProcessorSupplier<K, V> { diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index f9e23e826..ec0515c42 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -27,12 +27,13 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest @@ -43,6 +44,7 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.Test import kotlin.test.assertNotNull +import kotlin.test.assertTrue @RunWith(SpringRunner::class) @DataJpaTest @@ -72,6 +74,8 @@ import kotlin.test.assertNotNull ) open class MessagePrioritizationConsumerTest { + private val log = logger(MessagePrioritizationConsumerTest::class) + @Autowired lateinit var applicationContext: ApplicationContext @@ -82,6 +86,9 @@ open class MessagePrioritizationConsumerTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Autowired + lateinit var messagePrioritizationService: MessagePrioritizationService + + @Autowired lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer @Before @@ -107,6 +114,38 @@ open class MessagePrioritizationConsumerTest { } @Test + fun testMessagePrioritizationService() { + runBlocking { + assertTrue( + ::messagePrioritizationService.isInitialized, + "failed to initialize messagePrioritizationService" + ) + + log.info("**************** without Correlation **************") + /** Checking without correlation */ + MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach { + messagePrioritizationService.prioritize(it) + } + log.info("**************** Same Group , with Correlation **************") + /** checking same group with correlation */ + MessagePrioritizationSample + .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + log.info("**************** Different Type , with Correlation **************") + /** checking different type, with correlation */ + MessagePrioritizationSample + .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3) + .forEach { + delay(10) + messagePrioritizationService.prioritize(it) + } + } + } + + @Test fun testStartConsuming() { runBlocking { val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration() @@ -118,7 +157,9 @@ open class MessagePrioritizationConsumerTest { val spyStreamingConsumerService = spyk(streamingConsumerService) coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit coEvery { spyStreamingConsumerService.shutDown() } returns Unit - val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) + val messagePrioritizationConsumer = MessagePrioritizationConsumer( + bluePrintMessageLibPropertyService + ) val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer) // Test Topology diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt index 3d3d0c6f5..0285079ad 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt @@ -17,10 +17,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor -import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean @@ -65,22 +64,17 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio */ @Service -open class SampleMessagePrioritizationConsumer( +open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) : + SampleMessagePrioritizationService(messagePrioritizationStateService) + +/** For Kafka Consumer **/ +@Service +open class TestMessagePrioritizationConsumer( bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService ) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService) @Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE) -open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() { - override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? { - return when (messagePrioritization.group) { - "group-typed" -> arrayListOf("type-0", "type-1", "type-2") - else -> null - } - } -} - -@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE) -open class SampleMessageAggregateProcessor() : MessageAggregateProcessor() - -@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT) -open class SampleMessageOutputProcessor : MessageOutputProcessor() +open class TestMessagePrioritizeProcessor( + messagePrioritizationStateService: MessagePrioritizationStateService, + messagePrioritizationService: MessagePrioritizationService +) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService) |