aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/message-prioritizaion/src
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-16 11:21:50 -0500
committerBrinda Santh <bs2796@att.com>2020-01-16 13:58:05 -0500
commit8029f8e5332f107267ec11293c3099e54e87c67b (patch)
tree5341184c90a8a0013ea207dae2ed59d1ba3fa80b /ms/blueprintsprocessor/functions/message-prioritizaion/src
parentf52cf7ce4451fd5fa1bbc6cf30e5b2a0acab7276 (diff)
Prioritization Optional NATS consumer support
Add prioritization NATS consumer service and configuration data beans. Optimizing message prioritization service interface. Added Integration testing for NATS simulation. Updated sample docker compose for NATS support Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15
Diffstat (limited to 'ms/blueprintsprocessor/functions/message-prioritizaion/src')
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt84
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt)15
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt85
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt91
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt45
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt10
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt133
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt15
15 files changed, 514 insertions, 108 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 8345df523..424929b82 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable {
lateinit var shutDownConfiguration: ShutDownConfiguration
lateinit var cleanConfiguration: CleanConfiguration
var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+ var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
}
open class KafkaConfiguration : Serializable {
@@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable {
lateinit var outputTopic: String // Publish Configuration Selector
}
+open class NatsConfiguration : Serializable {
+ lateinit var connectionSelector: String // Consumer Configuration Selector
+ lateinit var inputSubject: String // Publish Configuration Selector
+ lateinit var expiredSubject: String // Publish Configuration Selector
+ lateinit var outputSubject: String // Publish Configuration Selector
+}
+
open class ExpiryConfiguration : Serializable {
var frequencyMilli: Long = 30000L
var maxPollRecord: Int = 1000
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
index 464f97a88..dfe516953 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -16,21 +16,22 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-import org.apache.kafka.streams.processor.ProcessorContext
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
interface MessagePrioritizationService {
- fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+ fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
+
+ fun getConfiguration(): PrioritizationConfiguration
suspend fun prioritize(messagePrioritization: MessagePrioritization)
/** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
suspend fun output(messages: List<MessagePrioritization>)
- /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
- suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+ /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
+ suspend fun updateExpiredMessages()
- /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
- suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
+ /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
+ suspend fun cleanExpiredMessage()
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
new file mode 100644
index 000000000..112a80379
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractKafkaMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractKafkaMessagePrioritizationService::class)
+
+ lateinit var processorContext: ProcessorContext
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ processorContext.forward(
+ updatedMessage.id,
+ updatedMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ processorContext.forward(
+ expiredMessage.id, expiredMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
index 656646ff7..d4f8470c8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -17,17 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
- private val log = logger(AbstractMessagePrioritizeProcessor::class)
-
- lateinit var prioritizationConfiguration: PrioritizationConfiguration
-
override fun init(processorContext: ProcessorContext) {
this.processorContext = processorContext
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
index 624a69fd4..1b0612492 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
open class DefaultMessagePrioritizeProcessor(
private val messagePrioritizationStateService: MessagePrioritizationStateService,
- private val messagePrioritizationService: MessagePrioritizationService
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
private val log = logger(DefaultMessagePrioritizeProcessor::class)
@@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor(
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
- messagePrioritizationService.prioritize(messagePrioritize)
+ kafkaMessagePrioritizationService.prioritize(messagePrioritize)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor(
override fun init(context: ProcessorContext) {
super.init(context)
/** Set Configuration and Processor Context to messagePrioritizationService */
- messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
+ kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
+ } else {
+ throw BluePrintProcessorException(
+ "messagePrioritizationService is not instance of " +
+ "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
+ )
+ }
}
override fun close() {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
index fb7cfd110..d5ec0233a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
@@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
-open class MessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+open class KafkaMessagePrioritizationConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) {
- private val log = logger(MessagePrioritizationConsumer::class)
+ private val log = logger(KafkaMessagePrioritizationConsumer::class)
- lateinit var streamingConsumerService: BlueprintMessageConsumerService
+ private lateinit var streamingConsumerService: BlueprintMessageConsumerService
open fun consumerService(selector: String): BlueprintMessageConsumerService {
return bluePrintMessageLibPropertyService
@@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer(
topology.addProcessor(
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
bluePrintProcessorSupplier<ByteArray, ByteArray>(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- prioritizationConfiguration
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
),
MessagePrioritizationConstants.SOURCE_INPUT
)
@@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer(
}
suspend fun shutDown() {
- if (streamingConsumerService != null) {
+ if (::streamingConsumerService.isInitialized) {
streamingConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
new file mode 100644
index 000000000..502a7822d
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractNatsMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractNatsMessagePrioritizationService::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ updatedMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (!expiredIds.isNullOrEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ expiredMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..20da2c28c
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.Subscription
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsMessagePrioritizationConsumer(
+ private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
+ private val natsMessagePrioritizationService: MessagePrioritizationService
+) {
+ private val log = logger(NatsMessagePrioritizationConsumer::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+ private lateinit var subscription: Subscription
+
+ suspend fun startConsuming() {
+ val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
+ val natsConfiguration = prioritizationConfiguration.natsConfiguration
+ ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
+
+ check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
+ "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
+ }
+ bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
+ natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
+ val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
+ val loadBalanceGroup = ClusterUtils.applicationName()
+ val messageHandler = createMessageHandler()
+ val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
+ subscription = bluePrintNatsService.loadBalanceSubscribe(
+ inputSubject,
+ loadBalanceGroup,
+ messageHandler,
+ subscriptionOptions
+ )
+ log.info(
+ "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
+ )
+ }
+
+ suspend fun shutDown() {
+ if (::subscription.isInitialized) {
+ subscription.unsubscribe()
+ }
+ log.info("Nats prioritization consumer listener shutdown complete")
+ }
+
+ private fun consumerService(selector: String): BluePrintNatsService {
+ return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
+ }
+
+ private fun createMessageHandler(): MessageHandler {
+ return MessageHandler { message ->
+ try {
+ val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
+ runBlocking {
+ natsMessagePrioritizationService.prioritize(messagePrioritization)
+ }
+ } catch (e: Exception) {
+ log.error("failed to process prioritize message", e)
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 931403200..a6963d83f 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -16,14 +16,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
@@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService(
private val log = logger(AbstractMessagePrioritizationService::class)
- var processorContext: ProcessorContext? = null
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
- override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
- this.processorContext = processorContext
+ override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+ this.prioritizationConfiguration = prioritizationConfiguration
+ }
+
+ override fun getConfiguration(): PrioritizationConfiguration {
+ return this.prioritizationConfiguration
}
override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
/** Get the cluster lock for message group */
val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
@@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService(
override suspend fun output(messages: List<MessagePrioritization>) {
log.info("$$$$$ received in output processor id(${messages.ids()})")
messages.forEach { message ->
- val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
}
}
- override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ override suspend fun updateExpiredMessages() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
try {
val fetchMessages = messagePrioritizationStateService
.getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.ids()
- if (expiredIds != null && expiredIds.isNotEmpty()) {
+ if (!expiredIds.isNullOrEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- if (processorContext != null) {
- fetchMessages.forEach { expired ->
- expired.state = MessageState.EXPIRED.name
- processorContext!!.forward(
- expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
}
} catch (e: Exception) {
log.error("failed in updating expired messages", e)
@@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService(
}
}
- override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ override suspend fun cleanExpiredMessage() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
try {
messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
index b1c1fb15f..2f08c1c34 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService(
}
*/
- open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+ open suspend fun startScheduling() {
+ val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+
log.info("Starting Prioritization Scheduler Service...")
GlobalScope.launch {
expiryScheduler(prioritizationConfiguration)
@@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+ messagePrioritizationService.updateExpiredMessages()
delay(expiryConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization expiry scheduler", e)
@@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+ messagePrioritizationService.cleanExpiredMessage()
delay(cleanConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization clean scheduler", e)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
index b7d878e4a..305e64ba4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -16,11 +16,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
AbstractMessagePrioritizationService(messagePrioritizationStateService) {
- private val log = logger(DefaultMessagePrioritizeProcessor::class)
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
/** Child overriding this implementation , if necessary */
override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+class SampleMessagePrioritizationHandler(
+ private val messagePrioritizationService: MessagePrioritizationService,
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) {
+
+ private val log = logger(SampleMessagePrioritizationHandler::class)
+
+ suspend fun handleAggregation(messages: List<MessagePrioritization>) {
log.info("messages(${messages.ids()}) aggregated")
/** Sequence based on Priority and Updated Date */
val sequencedMessage = messages.orderByHighestPriority()
/** Update all messages to aggregated state */
- messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
- output(sequencedMessage)
+ messagePrioritizationStateService.setMessagesState(
+ sequencedMessage.ids(),
+ MessageState.AGGREGATED.name
+ )
+ messagePrioritizationService.output(sequencedMessage)
}
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
/** Dummy Implementation, This can also be read from file and stored as cached map **/
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ "pass-typed" -> arrayListOf(messagePrioritization.type)
else -> null
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
index e497ef144..2c4ae30da 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.Calendar
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
import java.util.Date
import java.util.UUID
@@ -35,6 +37,12 @@ object MessagePrioritizationSample {
outputTopic = "prioritize-output-topic"
expiredTopic = "prioritize-expired-topic"
}
+ natsConfiguration = NatsConfiguration().apply {
+ connectionSelector = "cds-controller"
+ inputSubject = "prioritize-input"
+ outputSubject = "prioritize-output"
+ expiredSubject = "prioritize-expired"
+ }
expiryConfiguration = ExpiryConfiguration().apply {
frequencyMilli = 10000L
maxPollRecord = 2000
@@ -66,9 +74,7 @@ object MessagePrioritizationSample {
}
private fun currentDatePlusDays(days: Int): Date {
- val calender = Calendar.getInstance()
- calender.add(Calendar.DATE, days)
- return calender.time
+ return controllerDate().addDate(days)
}
fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 18b3e4dd7..9100fb51c 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
@@ -77,14 +76,11 @@ object MessageProcessorUtils {
}
}
- /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
- fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ /** Get the Kafka Supplier for processor lookup [name] **/
+ fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
- val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
- processorInstance.prioritizationConfiguration = prioritizationConfiguration
- processorInstance
+ BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
}
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 190f4e891..7f150f5f3 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import io.mockk.coEvery
import io.mockk.every
+import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -27,13 +28,23 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
@@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
+ classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
)
@TestPropertySource(
properties =
[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
"spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
"blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
@@ -70,7 +80,11 @@ import kotlin.test.assertTrue
// To send initial test message
"blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
]
)
open class MessagePrioritizationConsumerTest {
@@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
- lateinit var messagePrioritizationService: MessagePrioritizationService
-
- @Autowired
- lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
@Autowired
- lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
@Before
fun setup() {
@@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testMessagePrioritizationService() {
runBlocking {
- assertTrue(
- ::messagePrioritizationService.isInitialized,
- "failed to initialize messagePrioritizationService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
log.info("**************** without Correlation **************")
/** Checking without correlation */
@@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
@@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testSchedulerService() {
runBlocking {
- val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
- assertTrue(
- ::messagePrioritizationSchedulerService.isInitialized,
- "failed to initialize messagePrioritizationSchedulerService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
launch {
- messagePrioritizationSchedulerService.startScheduling(configuration)
+ messagePrioritizationSchedulerService.startScheduling()
}
launch {
/** To debug increase the delay time */
@@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest {
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
- fun testMessagePrioritizationConsumer() {
+ fun testKafkaMessagePrioritizationConsumer() {
runBlocking {
- messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
@@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest {
messagePrioritizationConsumer.shutDown()
}
}
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 0285079ad..22c399608 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -17,10 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
@@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
@Service
open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
SampleMessagePrioritizationService(messagePrioritizationStateService)
-
-/** For Kafka Consumer **/
-@Service
-open class TestMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
-) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
-
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor(
- messagePrioritizationStateService: MessagePrioritizationStateService,
- messagePrioritizationService: MessagePrioritizationService
-) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)