summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-08 12:36:42 -0500
committerBrinda Santh <bs2796@att.com>2020-01-08 12:36:42 -0500
commit863de6175fa7912930e5ee7e90215f6f3e820f36 (patch)
treece2d4d2c3f0012c3a25daa767aa6768bdb99fe31 /ms/blueprintsprocessor/functions
parent0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 (diff)
Rest endpoint for message Prioritization
Refactored to support both Kafka and Rest endpoint prioritization. Simplified number for Kafka processors. Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Iba77ed94be3398940840ff01a298f0bec785401f
Diffstat (limited to 'ms/blueprintsprocessor/functions')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/README.md5
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt29
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt68
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt1
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt17
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt)27
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt115
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt)38
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt)6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt)2
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt)164
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt)58
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt46
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt80
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt34
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt22
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt45
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt32
19 files changed, 440 insertions, 352 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
index 482bbc2cc..cda43faca 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
@@ -3,7 +3,6 @@ To Delete Topics
------------------
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
-kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic
kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
Create Topics
@@ -11,7 +10,6 @@ Create Topics
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
To List topics
----------------
@@ -26,6 +24,3 @@ To Listen for Output
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
-
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
-
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
index 28e096352..890e0a6ba 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
@@ -28,9 +28,6 @@ object MessagePrioritizationConstants {
const val SOURCE_INPUT = "source-prioritization-input"
const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
- const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
- const val PROCESSOR_OUTPUT = "processor-prioritization-output"
const val SINK_OUTPUT = "sink-prioritization-output"
- const val SINK_EXPIRED = "sink-prioritization-expired"
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
new file mode 100644
index 000000000..584fd00d3
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+
+interface MessagePrioritizationService {
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+
+ suspend fun prioritize(messagePrioritization: MessagePrioritization)
+
+ suspend fun output(id: String)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
new file mode 100644
index 000000000..5dd41d7f3
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.Date
+
+interface MessagePrioritizationStateService {
+
+ suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+ suspend fun getMessage(id: String): MessagePrioritization
+
+ suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
+ suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>?
+
+ suspend fun updateMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+ suspend fun setMessageState(id: String, state: String)
+
+ suspend fun setMessagesPriority(ids: List<String>, priority: String)
+
+ suspend fun setMessagesState(ids: List<String>, state: String)
+
+ suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+ suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
+
+ suspend fun deleteMessage(id: String)
+
+ suspend fun deleteMessageByGroup(group: String)
+
+ suspend fun deleteMessageStates(group: String, states: List<String>)
+
+ suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index bef7a7b61..05b820adb 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -17,7 +17,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
index 262dcb402..e90771fb8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -16,9 +16,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
@@ -31,7 +32,10 @@ import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping(value = ["/api/v1/message-prioritization"])
-open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+open class MessagePrioritizationApi(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
@GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
@@ -53,6 +57,15 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic
}
@PostMapping(
+ path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ @ResponseBody
+ fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+ messagePrioritizationService.prioritize(messagePrioritization)
+ }
+
+ @PostMapping(
path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
consumes = [MediaType.APPLICATION_JSON_VALUE]
)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
index 35566abb4..656646ff7 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -14,38 +14,21 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
private val log = logger(AbstractMessagePrioritizeProcessor::class)
lateinit var prioritizationConfiguration: PrioritizationConfiguration
- lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
- var clusterService: BluePrintClusterService? = null
- override fun init(context: ProcessorContext) {
- this.processorContext = context
- /** Get the State service to update in store */
- this.messagePrioritizationStateService = BluePrintDependencyService
- .messagePrioritizationStateService()
- }
-
- /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
- open fun initializeClusterService() {
- /** Get the Cluster service to update in store */
- if (BluePrintConstants.CLUSTER_ENABLED) {
- this.clusterService = BluePrintDependencyService.clusterService()
- }
+ override fun init(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
new file mode 100644
index 000000000..c14a404ad
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+
+open class DefaultMessagePrioritizeProcessor(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ lateinit var expiryCancellable: Cancellable
+ lateinit var cleanCancellable: Cancellable
+
+ override suspend fun processNB(key: ByteArray, value: ByteArray) {
+
+ val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ try {
+ messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ messagePrioritizationService.prioritize(messagePrioritize)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
+ /** Publish to Output topic */
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ super.init(context)
+ /** set up expiry marking cron */
+ initializeExpiryPunctuator()
+ /** Set up cleaning records cron */
+ initializeCleanPunctuator()
+ }
+
+ override fun close() {
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ expiryCancellable.cancel()
+ cleanCancellable.cancel()
+ }
+
+ open fun initializeExpiryPunctuator() {
+ val expiryPunctuator =
+ MessagePriorityExpiryPunctuator(
+ messagePrioritizationStateService
+ )
+ expiryPunctuator.processorContext = processorContext
+ expiryPunctuator.configuration = prioritizationConfiguration
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ expiryCancellable = processorContext.schedule(
+ Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+ )
+ log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+ }
+
+ open fun initializeCleanPunctuator() {
+ val cleanPunctuator =
+ MessagePriorityCleanPunctuator(
+ messagePrioritizationStateService
+ )
+ cleanPunctuator.processorContext = processorContext
+ cleanPunctuator.configuration = prioritizationConfiguration
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ cleanCancellable = processorContext.schedule(
+ Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+ )
+ log.info(
+ "Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
index b611060f7..d7666a20b 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
@@ -14,11 +14,12 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -42,7 +43,7 @@ open class MessagePrioritizationConsumer(
}
open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
- KafkaStreamConsumerFunction {
+ KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
override suspend fun createTopology(
@@ -52,7 +53,7 @@ open class MessagePrioritizationConsumer(
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
@@ -68,39 +69,12 @@ open class MessagePrioritizationConsumer(
MessagePrioritizationConstants.SOURCE_INPUT
)
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE
- )
-
- topology.addSink(
- MessagePrioritizationConstants.SINK_EXPIRED,
- prioritizationConfiguration.expiredTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
prioritizationConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- MessagePrioritizationConstants.PROCESSOR_OUTPUT
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
// Output will be sent to the group-output topic from Processor API
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
index 5435ebe30..e27cf16d0 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
@@ -14,13 +14,13 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.To
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -46,7 +46,7 @@ class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateServ
fetchMessages.forEach { expired ->
processorContext.forward(
expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
)
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
index f2a481f74..5595863d4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 4e4e2da7a..13c0dd7bc 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -14,43 +14,42 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
- private val log = logger(MessagePrioritizeProcessor::class)
+ private val log = logger(AbstractMessagePrioritizationService::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
+ var processorContext: ProcessorContext? = null
- override suspend fun processNB(key: ByteArray, value: ByteArray) {
- log.info("***** received in prioritize processor key(${String(key)})")
- val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
+ log.info("***** received in prioritize processor key(${messagePrioritize.id})")
/** Get the cluster lock for message group */
- val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -59,58 +58,16 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
messagePrioritize.id, MessageState.ERROR.name,
messagePrioritize.error!!
)
- /** Publish to Output topic */
- this.processorContext.forward(
- messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
}
}
- override fun init(context: ProcessorContext) {
- super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
- /** Set up Cluster Service */
- initializeClusterService()
- }
-
- override fun close() {
- log.info(
- "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
+ override suspend fun output(id: String) {
+ log.info("$$$$$ received in output processor id($id)")
+ val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -126,10 +83,11 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
)
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
- group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
- )
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+ .getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
@@ -139,12 +97,9 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
if (correlationResults.correlated) {
/** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
- /** Send only correlated ids to next processor */
- this.processorContext.forward(
- UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+ /** Send only correlated ids to aggregate processor */
+ aggregate(correlatedIds)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
@@ -159,16 +114,57 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ aggregate(messagePrioritization.id)
+ }
+ }
+
+ open suspend fun aggregate(strIds: String) {
+ log.info("@@@@@ received in aggregation processor ids($strIds)")
+ val ids = strIds.split(",").map { it.trim() }
+ if (!ids.isNullOrEmpty()) {
+ try {
+ if (ids.size == 1) {
+ /** No aggregation or sequencing needed, simpley forward to next processor */
+ output(ids.first())
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ } catch (e: Exception) {
+ val error = "failed in Aggregate message($ids) : ${e.message}"
+ log.error(error, e)
+ val storeMessages = messagePrioritizationStateService.getMessages(ids)
+ if (!storeMessages.isNullOrEmpty()) {
+ storeMessages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
+ /** Publish to output topic */
+ output(messagePrioritization.id)
+ } catch (sendException: Exception) {
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e
+ )
+ }
+ }
+ }
+ }
}
}
+ /** Child will override this implementation , if necessary
+ * Here the place child has to implement custom Sequencing and Aggregation logic.
+ * */
+ abstract suspend fun handleAggregation(messageIds: List<String>)
+
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */
- open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return null
- }
+ abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
index 017658ff6..d9cd956bf 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
@@ -16,6 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
@@ -27,59 +28,10 @@ import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.Date
-interface MessagePrioritizationStateService {
-
- suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
-
- suspend fun getMessage(id: String): MessagePrioritization
-
- suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
-
- suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
-
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
-
- suspend fun getCorrelatedMessages(
- group: String,
- states: List<String>,
- types: List<String>?,
- correlationIds: String
- ): List<MessagePrioritization>?
-
- suspend fun updateMessagesState(ids: List<String>, state: String)
-
- suspend fun updateMessageState(id: String, state: String): MessagePrioritization
-
- suspend fun setMessageState(id: String, state: String)
-
- suspend fun setMessagesPriority(ids: List<String>, priority: String)
-
- suspend fun setMessagesState(ids: List<String>, state: String)
-
- suspend fun setMessageStateANdError(id: String, state: String, error: String)
-
- suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
-
- suspend fun deleteMessage(id: String)
-
- suspend fun deleteMessageByGroup(group: String)
-
- suspend fun deleteMessageStates(group: String, states: List<String>)
-
- suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
-}
-
@Service
open class MessagePrioritizationStateServiceImpl(
private val prioritizationMessageRepository: PrioritizationMessageRepository
-) :
- MessagePrioritizationStateService {
+) : MessagePrioritizationStateService {
private val log = logger(MessagePrioritizationStateServiceImpl::class)
@@ -110,7 +62,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
@@ -118,7 +70,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
@@ -126,7 +78,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByByGroupAndExpiredDate(
group,
expiryDate, PageRequest.of(0, count)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
new file mode 100644
index 000000000..fcdb71cda
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messageIds: List<String>) {
+ log.info("messages($messageIds) aggregated")
+ messageIds.forEach { id ->
+ output(id)
+ }
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ /** Dummy Implementation, This can also be read from file and stored as cached map **/
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ else -> null
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
deleted file mode 100644
index 3e697e633..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageAggregateProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
-
- log.info("@@@@@ received in aggregation processor key($key), value($value)")
- val ids = value.split(",").map { it.trim() }
- if (!ids.isNullOrEmpty()) {
- try {
- if (ids.size == 1) {
- processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
- }
- } catch (e: Exception) {
- val error = "failed in Aggregate message($ids) : ${e.message}"
- log.error(error, e)
- val storeMessages = messagePrioritizationStateService.getMessages(ids)
- if (!storeMessages.isNullOrEmpty()) {
- storeMessages.forEach { messagePrioritization ->
- try {
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritization.id,
- MessageState.ERROR.name, error
- )
- /** Publish to Error topic */
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- } catch (sendException: Exception) {
- log.error(
- "failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}", e
- )
- }
- }
- }
- }
- }
- }
-
- /** Child will override this implementation , if necessary */
- open suspend fun handleAggregation(messageIds: List<String>) {
- log.info("messages($messageIds) aggregated")
- messageIds.forEach { id ->
- processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
deleted file mode 100644
index cf6520df5..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageOutputProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
- log.info("$$$$$ received in output processor key($key), value($value)")
- val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
- processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 49230b6e4..186499d66 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -17,28 +17,27 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
- /** Utility to create the cluster lock for message [messagePrioritization] */
- suspend fun prioritizationGrouplock(
- clusterService: BluePrintClusterService?,
- messagePrioritization: MessagePrioritization
- ): ClusterLock? {
+ /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
+ suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+
return if (clusterService != null && clusterService.clusterJoined() &&
!messagePrioritization.correlationId.isNullOrBlank()
) {
// Get the correlation key in ascending order, even it it is misplaced
val correlationId = messagePrioritization.toFormatedCorrelation()
- val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
+ val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
@@ -46,14 +45,15 @@ object MessageProcessorUtils {
} else null
}
- /** Utility used to cluster unlock for message [messagePrioritization] */
- suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
- if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ /** Utility used to cluster unlock for message [clusterLock] */
+ suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+ if (clusterLock != null) {
clusterLock.unLock()
clusterLock.close()
}
}
+ /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index f9e23e826..ec0515c42 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -27,12 +27,13 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
@@ -43,6 +44,7 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@@ -72,6 +74,8 @@ import kotlin.test.assertNotNull
)
open class MessagePrioritizationConsumerTest {
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
@Autowired
lateinit var applicationContext: ApplicationContext
@@ -82,6 +86,9 @@ open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
+ lateinit var messagePrioritizationService: MessagePrioritizationService
+
+ @Autowired
lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
@Before
@@ -107,6 +114,38 @@ open class MessagePrioritizationConsumerTest {
}
@Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ assertTrue(
+ ::messagePrioritizationService.isInitialized,
+ "failed to initialize messagePrioritizationService"
+ )
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
+ @Test
fun testStartConsuming() {
runBlocking {
val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
@@ -118,7 +157,9 @@ open class MessagePrioritizationConsumerTest {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService
+ )
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 3d3d0c6f5..0285079ad 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -17,10 +17,9 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
@@ -65,22 +64,17 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
*/
@Service
-open class SampleMessagePrioritizationConsumer(
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+
+/** For Kafka Consumer **/
+@Service
+open class TestMessagePrioritizationConsumer(
bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return when (messagePrioritization.group) {
- "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
- else -> null
- }
- }
-}
-
-@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
-
-@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class SampleMessageOutputProcessor : MessageOutputProcessor()
+open class TestMessagePrioritizeProcessor(
+ messagePrioritizationStateService: MessagePrioritizationStateService,
+ messagePrioritizationService: MessagePrioritizationService
+) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)