summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt84
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt)15
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt85
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt91
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt45
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt10
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt133
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt15
15 files changed, 514 insertions, 108 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 8345df523..424929b82 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable {
lateinit var shutDownConfiguration: ShutDownConfiguration
lateinit var cleanConfiguration: CleanConfiguration
var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+ var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
}
open class KafkaConfiguration : Serializable {
@@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable {
lateinit var outputTopic: String // Publish Configuration Selector
}
+open class NatsConfiguration : Serializable {
+ lateinit var connectionSelector: String // Consumer Configuration Selector
+ lateinit var inputSubject: String // Publish Configuration Selector
+ lateinit var expiredSubject: String // Publish Configuration Selector
+ lateinit var outputSubject: String // Publish Configuration Selector
+}
+
open class ExpiryConfiguration : Serializable {
var frequencyMilli: Long = 30000L
var maxPollRecord: Int = 1000
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
index 464f97a88..dfe516953 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -16,21 +16,22 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-import org.apache.kafka.streams.processor.ProcessorContext
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
interface MessagePrioritizationService {
- fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+ fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
+
+ fun getConfiguration(): PrioritizationConfiguration
suspend fun prioritize(messagePrioritization: MessagePrioritization)
/** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
suspend fun output(messages: List<MessagePrioritization>)
- /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
- suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+ /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
+ suspend fun updateExpiredMessages()
- /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
- suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
+ /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
+ suspend fun cleanExpiredMessage()
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
new file mode 100644
index 000000000..112a80379
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractKafkaMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractKafkaMessagePrioritizationService::class)
+
+ lateinit var processorContext: ProcessorContext
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ processorContext.forward(
+ updatedMessage.id,
+ updatedMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ processorContext.forward(
+ expiredMessage.id, expiredMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
index 656646ff7..d4f8470c8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -17,17 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
- private val log = logger(AbstractMessagePrioritizeProcessor::class)
-
- lateinit var prioritizationConfiguration: PrioritizationConfiguration
-
override fun init(processorContext: ProcessorContext) {
this.processorContext = processorContext
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
index 624a69fd4..1b0612492 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
open class DefaultMessagePrioritizeProcessor(
private val messagePrioritizationStateService: MessagePrioritizationStateService,
- private val messagePrioritizationService: MessagePrioritizationService
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
private val log = logger(DefaultMessagePrioritizeProcessor::class)
@@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor(
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
- messagePrioritizationService.prioritize(messagePrioritize)
+ kafkaMessagePrioritizationService.prioritize(messagePrioritize)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor(
override fun init(context: ProcessorContext) {
super.init(context)
/** Set Configuration and Processor Context to messagePrioritizationService */
- messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
+ kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
+ } else {
+ throw BluePrintProcessorException(
+ "messagePrioritizationService is not instance of " +
+ "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
+ )
+ }
}
override fun close() {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
index fb7cfd110..d5ec0233a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
@@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
-open class MessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+open class KafkaMessagePrioritizationConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) {
- private val log = logger(MessagePrioritizationConsumer::class)
+ private val log = logger(KafkaMessagePrioritizationConsumer::class)
- lateinit var streamingConsumerService: BlueprintMessageConsumerService
+ private lateinit var streamingConsumerService: BlueprintMessageConsumerService
open fun consumerService(selector: String): BlueprintMessageConsumerService {
return bluePrintMessageLibPropertyService
@@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer(
topology.addProcessor(
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
bluePrintProcessorSupplier<ByteArray, ByteArray>(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- prioritizationConfiguration
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
),
MessagePrioritizationConstants.SOURCE_INPUT
)
@@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer(
}
suspend fun shutDown() {
- if (streamingConsumerService != null) {
+ if (::streamingConsumerService.isInitialized) {
streamingConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
new file mode 100644
index 000000000..502a7822d
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractNatsMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractNatsMessagePrioritizationService::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ updatedMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (!expiredIds.isNullOrEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ expiredMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..20da2c28c
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.Subscription
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsMessagePrioritizationConsumer(
+ private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
+ private val natsMessagePrioritizationService: MessagePrioritizationService
+) {
+ private val log = logger(NatsMessagePrioritizationConsumer::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+ private lateinit var subscription: Subscription
+
+ suspend fun startConsuming() {
+ val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
+ val natsConfiguration = prioritizationConfiguration.natsConfiguration
+ ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
+
+ check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
+ "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
+ }
+ bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
+ natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
+ val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
+ val loadBalanceGroup = ClusterUtils.applicationName()
+ val messageHandler = createMessageHandler()
+ val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
+ subscription = bluePrintNatsService.loadBalanceSubscribe(
+ inputSubject,
+ loadBalanceGroup,
+ messageHandler,
+ subscriptionOptions
+ )
+ log.info(
+ "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
+ )
+ }
+
+ suspend fun shutDown() {
+ if (::subscription.isInitialized) {
+ subscription.unsubscribe()
+ }
+ log.info("Nats prioritization consumer listener shutdown complete")
+ }
+
+ private fun consumerService(selector: String): BluePrintNatsService {
+ return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
+ }
+
+ private fun createMessageHandler(): MessageHandler {
+ return MessageHandler { message ->
+ try {
+ val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
+ runBlocking {
+ natsMessagePrioritizationService.prioritize(messagePrioritization)
+ }
+ } catch (e: Exception) {
+ log.error("failed to process prioritize message", e)
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 931403200..a6963d83f 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -16,14 +16,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
@@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService(
private val log = logger(AbstractMessagePrioritizationService::class)
- var processorContext: ProcessorContext? = null
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
- override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
- this.processorContext = processorContext
+ override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+ this.prioritizationConfiguration = prioritizationConfiguration
+ }
+
+ override fun getConfiguration(): PrioritizationConfiguration {
+ return this.prioritizationConfiguration
}
override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
/** Get the cluster lock for message group */
val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
@@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService(
override suspend fun output(messages: List<MessagePrioritization>) {
log.info("$$$$$ received in output processor id(${messages.ids()})")
messages.forEach { message ->
- val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
}
}
- override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ override suspend fun updateExpiredMessages() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
try {
val fetchMessages = messagePrioritizationStateService
.getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.ids()
- if (expiredIds != null && expiredIds.isNotEmpty()) {
+ if (!expiredIds.isNullOrEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- if (processorContext != null) {
- fetchMessages.forEach { expired ->
- expired.state = MessageState.EXPIRED.name
- processorContext!!.forward(
- expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
}
} catch (e: Exception) {
log.error("failed in updating expired messages", e)
@@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService(
}
}
- override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ override suspend fun cleanExpiredMessage() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
try {
messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
index b1c1fb15f..2f08c1c34 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService(
}
*/
- open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+ open suspend fun startScheduling() {
+ val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+
log.info("Starting Prioritization Scheduler Service...")
GlobalScope.launch {
expiryScheduler(prioritizationConfiguration)
@@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+ messagePrioritizationService.updateExpiredMessages()
delay(expiryConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization expiry scheduler", e)
@@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+ messagePrioritizationService.cleanExpiredMessage()
delay(cleanConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization clean scheduler", e)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
index b7d878e4a..305e64ba4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -16,11 +16,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
AbstractMessagePrioritizationService(messagePrioritizationStateService) {
- private val log = logger(DefaultMessagePrioritizeProcessor::class)
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
/** Child overriding this implementation , if necessary */
override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+class SampleMessagePrioritizationHandler(
+ private val messagePrioritizationService: MessagePrioritizationService,
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) {
+
+ private val log = logger(SampleMessagePrioritizationHandler::class)
+
+ suspend fun handleAggregation(messages: List<MessagePrioritization>) {
log.info("messages(${messages.ids()}) aggregated")
/** Sequence based on Priority and Updated Date */
val sequencedMessage = messages.orderByHighestPriority()
/** Update all messages to aggregated state */
- messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
- output(sequencedMessage)
+ messagePrioritizationStateService.setMessagesState(
+ sequencedMessage.ids(),
+ MessageState.AGGREGATED.name
+ )
+ messagePrioritizationService.output(sequencedMessage)
}
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
/** Dummy Implementation, This can also be read from file and stored as cached map **/
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ "pass-typed" -> arrayListOf(messagePrioritization.type)
else -> null
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
index e497ef144..2c4ae30da 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.Calendar
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
import java.util.Date
import java.util.UUID
@@ -35,6 +37,12 @@ object MessagePrioritizationSample {
outputTopic = "prioritize-output-topic"
expiredTopic = "prioritize-expired-topic"
}
+ natsConfiguration = NatsConfiguration().apply {
+ connectionSelector = "cds-controller"
+ inputSubject = "prioritize-input"
+ outputSubject = "prioritize-output"
+ expiredSubject = "prioritize-expired"
+ }
expiryConfiguration = ExpiryConfiguration().apply {
frequencyMilli = 10000L
maxPollRecord = 2000
@@ -66,9 +74,7 @@ object MessagePrioritizationSample {
}
private fun currentDatePlusDays(days: Int): Date {
- val calender = Calendar.getInstance()
- calender.add(Calendar.DATE, days)
- return calender.time
+ return controllerDate().addDate(days)
}
fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 18b3e4dd7..9100fb51c 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
@@ -77,14 +76,11 @@ object MessageProcessorUtils {
}
}
- /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
- fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ /** Get the Kafka Supplier for processor lookup [name] **/
+ fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
- val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
- processorInstance.prioritizationConfiguration = prioritizationConfiguration
- processorInstance
+ BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
}
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 190f4e891..7f150f5f3 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import io.mockk.coEvery
import io.mockk.every
+import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -27,13 +28,23 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
@@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
+ classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
)
@TestPropertySource(
properties =
[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
"spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
"blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
@@ -70,7 +80,11 @@ import kotlin.test.assertTrue
// To send initial test message
"blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
]
)
open class MessagePrioritizationConsumerTest {
@@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
- lateinit var messagePrioritizationService: MessagePrioritizationService
-
- @Autowired
- lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
@Autowired
- lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
@Before
fun setup() {
@@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testMessagePrioritizationService() {
runBlocking {
- assertTrue(
- ::messagePrioritizationService.isInitialized,
- "failed to initialize messagePrioritizationService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
log.info("**************** without Correlation **************")
/** Checking without correlation */
@@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
@@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testSchedulerService() {
runBlocking {
- val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
- assertTrue(
- ::messagePrioritizationSchedulerService.isInitialized,
- "failed to initialize messagePrioritizationSchedulerService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
launch {
- messagePrioritizationSchedulerService.startScheduling(configuration)
+ messagePrioritizationSchedulerService.startScheduling()
}
launch {
/** To debug increase the delay time */
@@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest {
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
- fun testMessagePrioritizationConsumer() {
+ fun testKafkaMessagePrioritizationConsumer() {
runBlocking {
- messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
@@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest {
messagePrioritizationConsumer.shutDown()
}
}
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 0285079ad..22c399608 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -17,10 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
@@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
@Service
open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
SampleMessagePrioritizationService(messagePrioritizationStateService)
-
-/** For Kafka Consumer **/
-@Service
-open class TestMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
-) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
-
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor(
- messagePrioritizationStateService: MessagePrioritizationStateService,
- messagePrioritizationService: MessagePrioritizationService
-) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)