summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-16 11:21:50 -0500
committerBrinda Santh <bs2796@att.com>2020-01-16 13:58:05 -0500
commit8029f8e5332f107267ec11293c3099e54e87c67b (patch)
tree5341184c90a8a0013ea207dae2ed59d1ba3fa80b
parentf52cf7ce4451fd5fa1bbc6cf30e5b2a0acab7276 (diff)
Prioritization Optional NATS consumer support
Add prioritization NATS consumer service and configuration data beans. Optimizing message prioritization service interface. Added Integration testing for NATS simulation. Updated sample docker compose for NATS support Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml19
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml21
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt84
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt13
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt)15
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt85
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt91
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt45
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt8
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt82
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt10
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt133
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt15
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt9
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt10
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt45
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt53
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt2
26 files changed, 678 insertions, 119 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index 020038c26..7159534ba 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -22,13 +22,17 @@ services:
image: nats-streaming:latest
container_name: nats
hostname: nats
- command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0"
+ command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats-0"
networks:
- cds-network
ports:
- "8222:8222"
- "4222:4222"
restart: always
+ volumes:
+ - target: /opt/app/onap/nats/store
+ type: volume
+ source: nats-store
cds-controller-0:
depends_on:
- db
@@ -57,6 +61,7 @@ services:
CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ NATS_CLUSTER_ID: cds-cluster
NATS_HOSTS: nats://nats:4222
APPLICATIONNAME: cds-controller
BUNDLEVERSION: 1.0.0
@@ -90,6 +95,7 @@ services:
CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+ NATS_CLUSTER_ID: cds-cluster
NATS_HOSTS: nats://nats:4222
APPLICATIONNAME: resource-resolution
BUNDLEVERSION: 1.0.0
@@ -116,6 +122,7 @@ services:
CLUSTER_ID: cds-cluster
CLUSTER_NODE_ID: py-executor-0
CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+ NATS_CLUSTER_ID: cds-cluster
NATS_HOSTS: nats://nats:4222
APPLICATIONNAME: py-executor
BUNDLEVERSION: 1.0.0
@@ -132,6 +139,12 @@ volumes:
type: none
device: /opt/app/cds/mysql/data
o: bind
+ nats-store:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/nats/nats-0/store
+ o: bind
blueprints-deploy:
driver: local
driver_opts:
@@ -142,13 +155,13 @@ volumes:
driver: local
driver_opts:
type: none
- device: /opt/app/cds/cds-controller/config
+ device: /opt/app/cds/cds-controller/cds-controller-0/config
o: bind
resource-resolution-config:
driver: local
driver_opts:
type: none
- device: /opt/app/cds/resource-resolution/config
+ device: /opt/app/cds/resource-resolution/resource-resolution-0/config
o: bind
networks:
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index 20b17bc90..8f2a78639 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -18,6 +18,21 @@ services:
MYSQL_DATABASE: sdnctl
MYSQL_USER: sdnctl
MYSQL_PASSWORD: sdnctl
+ nats:
+ image: nats-streaming:latest
+ container_name: nats
+ hostname: nats
+ command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats"
+ networks:
+ - cds-network
+ ports:
+ - "8222:8222"
+ - "4222:4222"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/nats/store
+ type: volume
+ source: nats-store
cds-controller-default:
depends_on:
- db
@@ -89,6 +104,12 @@ volumes:
type: none
device: /opt/app/cds/mysql/data
o: bind
+ nats-store:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/nats/store
+ o: bind
blueprints-deploy:
driver: local
driver_opts:
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index 8345df523..424929b82 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable {
lateinit var shutDownConfiguration: ShutDownConfiguration
lateinit var cleanConfiguration: CleanConfiguration
var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+ var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
}
open class KafkaConfiguration : Serializable {
@@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable {
lateinit var outputTopic: String // Publish Configuration Selector
}
+open class NatsConfiguration : Serializable {
+ lateinit var connectionSelector: String // Consumer Configuration Selector
+ lateinit var inputSubject: String // Publish Configuration Selector
+ lateinit var expiredSubject: String // Publish Configuration Selector
+ lateinit var outputSubject: String // Publish Configuration Selector
+}
+
open class ExpiryConfiguration : Serializable {
var frequencyMilli: Long = 30000L
var maxPollRecord: Int = 1000
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
index 464f97a88..dfe516953 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -16,21 +16,22 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
-import org.apache.kafka.streams.processor.ProcessorContext
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
interface MessagePrioritizationService {
- fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+ fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
+
+ fun getConfiguration(): PrioritizationConfiguration
suspend fun prioritize(messagePrioritization: MessagePrioritization)
/** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
suspend fun output(messages: List<MessagePrioritization>)
- /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
- suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+ /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
+ suspend fun updateExpiredMessages()
- /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
- suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
+ /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
+ suspend fun cleanExpiredMessage()
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
new file mode 100644
index 000000000..112a80379
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractKafkaMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractKafkaMessagePrioritizationService::class)
+
+ lateinit var processorContext: ProcessorContext
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+ processorContext.forward(
+ updatedMessage.id,
+ updatedMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+ check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (expiredIds != null && expiredIds.isNotEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ processorContext.forward(
+ expiredMessage.id, expiredMessage,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
index 656646ff7..d4f8470c8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -17,17 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
- private val log = logger(AbstractMessagePrioritizeProcessor::class)
-
- lateinit var prioritizationConfiguration: PrioritizationConfiguration
-
override fun init(processorContext: ProcessorContext) {
this.processorContext = processorContext
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
index 624a69fd4..1b0612492 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
open class DefaultMessagePrioritizeProcessor(
private val messagePrioritizationStateService: MessagePrioritizationStateService,
- private val messagePrioritizationService: MessagePrioritizationService
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
private val log = logger(DefaultMessagePrioritizeProcessor::class)
@@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor(
val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
?: throw BluePrintProcessorException("failed to convert")
try {
- messagePrioritizationService.prioritize(messagePrioritize)
+ kafkaMessagePrioritizationService.prioritize(messagePrioritize)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor(
override fun init(context: ProcessorContext) {
super.init(context)
/** Set Configuration and Processor Context to messagePrioritizationService */
- messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
+ kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
+ } else {
+ throw BluePrintProcessorException(
+ "messagePrioritizationService is not instance of " +
+ "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
+ )
+ }
}
override fun close() {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
index fb7cfd110..d5ec0233a 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
@@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
-open class MessagePrioritizationConsumer(
- private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+open class KafkaMessagePrioritizationConsumer(
+ private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+ private val kafkaMessagePrioritizationService: MessagePrioritizationService
) {
- private val log = logger(MessagePrioritizationConsumer::class)
+ private val log = logger(KafkaMessagePrioritizationConsumer::class)
- lateinit var streamingConsumerService: BlueprintMessageConsumerService
+ private lateinit var streamingConsumerService: BlueprintMessageConsumerService
open fun consumerService(selector: String): BlueprintMessageConsumerService {
return bluePrintMessageLibPropertyService
@@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer(
topology.addProcessor(
MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
bluePrintProcessorSupplier<ByteArray, ByteArray>(
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- prioritizationConfiguration
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
),
MessagePrioritizationConstants.SOURCE_INPUT
)
@@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer(
}
suspend fun shutDown() {
- if (streamingConsumerService != null) {
+ if (::streamingConsumerService.isInitialized) {
streamingConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
new file mode 100644
index 000000000..502a7822d
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractNatsMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(AbstractNatsMessagePrioritizationService::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+
+ override suspend fun output(messages: List<MessagePrioritization>) {
+ log.info("$$$$$ received in output processor id(${messages.ids()})")
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
+ messages.forEach { message ->
+ val updatedMessage =
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ updatedMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+
+ override suspend fun updateExpiredMessages() {
+ checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+ check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
+ val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+ try {
+ val fetchMessages = messagePrioritizationStateService
+ .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+ val expiredIds = fetchMessages?.ids()
+ if (!expiredIds.isNullOrEmpty()) {
+ messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+ fetchMessages.forEach { expiredMessage ->
+ expiredMessage.state = MessageState.EXPIRED.name
+ /** send to the output subject */
+ bluePrintNatsService.publish(
+ NatsClusterUtils.currentApplicationSubject(outputSubject),
+ expiredMessage.asJsonType().asByteArray()
+ )
+ }
+ }
+ } catch (e: Exception) {
+ log.error("failed in updating expired messages", e)
+ } finally {
+ MessageProcessorUtils.prioritizationUnLock(clusterLock)
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
new file mode 100644
index 000000000..20da2c28c
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.Subscription
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsMessagePrioritizationConsumer(
+ private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
+ private val natsMessagePrioritizationService: MessagePrioritizationService
+) {
+ private val log = logger(NatsMessagePrioritizationConsumer::class)
+
+ lateinit var bluePrintNatsService: BluePrintNatsService
+ private lateinit var subscription: Subscription
+
+ suspend fun startConsuming() {
+ val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
+ val natsConfiguration = prioritizationConfiguration.natsConfiguration
+ ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
+
+ check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
+ "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
+ }
+ bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
+ natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
+ val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
+ val loadBalanceGroup = ClusterUtils.applicationName()
+ val messageHandler = createMessageHandler()
+ val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
+ subscription = bluePrintNatsService.loadBalanceSubscribe(
+ inputSubject,
+ loadBalanceGroup,
+ messageHandler,
+ subscriptionOptions
+ )
+ log.info(
+ "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
+ )
+ }
+
+ suspend fun shutDown() {
+ if (::subscription.isInitialized) {
+ subscription.unsubscribe()
+ }
+ log.info("Nats prioritization consumer listener shutdown complete")
+ }
+
+ private fun consumerService(selector: String): BluePrintNatsService {
+ return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
+ }
+
+ private fun createMessageHandler(): MessageHandler {
+ return MessageHandler { message ->
+ try {
+ val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
+ runBlocking {
+ natsMessagePrioritizationService.prioritize(messagePrioritization)
+ }
+ } catch (e: Exception) {
+ log.error("failed to process prioritize message", e)
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 931403200..a6963d83f 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -16,14 +16,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
@@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService(
private val log = logger(AbstractMessagePrioritizationService::class)
- var processorContext: ProcessorContext? = null
+ lateinit var prioritizationConfiguration: PrioritizationConfiguration
- override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
- this.processorContext = processorContext
+ override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+ this.prioritizationConfiguration = prioritizationConfiguration
+ }
+
+ override fun getConfiguration(): PrioritizationConfiguration {
+ return this.prioritizationConfiguration
}
override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
/** Get the cluster lock for message group */
val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
@@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService(
override suspend fun output(messages: List<MessagePrioritization>) {
log.info("$$$$$ received in output processor id(${messages.ids()})")
messages.forEach { message ->
- val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
- /** Check for Kafka Processing, If yes, then send to the output topic */
- if (this.processorContext != null) {
- processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
+ messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
}
}
- override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+ override suspend fun updateExpiredMessages() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
try {
val fetchMessages = messagePrioritizationStateService
.getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
val expiredIds = fetchMessages?.ids()
- if (expiredIds != null && expiredIds.isNotEmpty()) {
+ if (!expiredIds.isNullOrEmpty()) {
messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
- if (processorContext != null) {
- fetchMessages.forEach { expired ->
- expired.state = MessageState.EXPIRED.name
- processorContext!!.forward(
- expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- }
- }
}
} catch (e: Exception) {
log.error("failed in updating expired messages", e)
@@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService(
}
}
- override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+ override suspend fun cleanExpiredMessage() {
+ check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
try {
messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
index b1c1fb15f..2f08c1c34 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
@@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService(
}
*/
- open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+ open suspend fun startScheduling() {
+ val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+
log.info("Starting Prioritization Scheduler Service...")
GlobalScope.launch {
expiryScheduler(prioritizationConfiguration)
@@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+ messagePrioritizationService.updateExpiredMessages()
delay(expiryConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization expiry scheduler", e)
@@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService(
withContext(Dispatchers.Default) {
while (keepGoing) {
try {
- messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+ messagePrioritizationService.cleanExpiredMessage()
delay(cleanConfiguration.frequencyMilli)
} catch (e: Exception) {
log.error("failed in prioritization clean scheduler", e)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
index b7d878e4a..305e64ba4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -16,11 +16,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
AbstractMessagePrioritizationService(messagePrioritizationStateService) {
- private val log = logger(DefaultMessagePrioritizeProcessor::class)
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
/** Child overriding this implementation , if necessary */
override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ sampleMessagePrioritizationHandler.handleAggregation(messages)
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+ this, messagePrioritizationStateService
+ )
+ return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+ }
+}
+
+class SampleMessagePrioritizationHandler(
+ private val messagePrioritizationService: MessagePrioritizationService,
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) {
+
+ private val log = logger(SampleMessagePrioritizationHandler::class)
+
+ suspend fun handleAggregation(messages: List<MessagePrioritization>) {
log.info("messages(${messages.ids()}) aggregated")
/** Sequence based on Priority and Updated Date */
val sequencedMessage = messages.orderByHighestPriority()
/** Update all messages to aggregated state */
- messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
- output(sequencedMessage)
+ messagePrioritizationStateService.setMessagesState(
+ sequencedMessage.ids(),
+ MessageState.AGGREGATED.name
+ )
+ messagePrioritizationService.output(sequencedMessage)
}
- /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
- * otherwise correlation happens with group and correlationId */
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
return when (messagePrioritization.group) {
/** Dummy Implementation, This can also be read from file and stored as cached map **/
"group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ "pass-typed" -> arrayListOf(messagePrioritization.type)
else -> null
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
index e497ef144..2c4ae30da 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
@@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.Calendar
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
import java.util.Date
import java.util.UUID
@@ -35,6 +37,12 @@ object MessagePrioritizationSample {
outputTopic = "prioritize-output-topic"
expiredTopic = "prioritize-expired-topic"
}
+ natsConfiguration = NatsConfiguration().apply {
+ connectionSelector = "cds-controller"
+ inputSubject = "prioritize-input"
+ outputSubject = "prioritize-output"
+ expiredSubject = "prioritize-expired"
+ }
expiryConfiguration = ExpiryConfiguration().apply {
frequencyMilli = 10000L
maxPollRecord = 2000
@@ -66,9 +74,7 @@ object MessagePrioritizationSample {
}
private fun currentDatePlusDays(days: Int): Date {
- val calender = Calendar.getInstance()
- calender.add(Calendar.DATE, days)
- return calender.time
+ return controllerDate().addDate(days)
}
fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index 18b3e4dd7..9100fb51c 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
@@ -77,14 +76,11 @@ object MessageProcessorUtils {
}
}
- /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
- fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
- ProcessorSupplier<K, V> {
+ /** Get the Kafka Supplier for processor lookup [name] **/
+ fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
- val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
- processorInstance.prioritizationConfiguration = prioritizationConfiguration
- processorInstance
+ BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
}
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 190f4e891..7f150f5f3 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import io.mockk.coEvery
import io.mockk.every
+import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -27,13 +28,23 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
@@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@DirtiesContext
@ContextConfiguration(
- classes = [BluePrintMessageLibConfiguration::class,
+ classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
)
@TestPropertySource(
properties =
[
- "spring.jpa.show-sql=true",
- "spring.jpa.properties.hibernate.show_sql=true",
+ "spring.jpa.show-sql=false",
+ "spring.jpa.properties.hibernate.show_sql=false",
"spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
"blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
@@ -70,7 +80,11 @@ import kotlin.test.assertTrue
// To send initial test message
"blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+ "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+ "blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
]
)
open class MessagePrioritizationConsumerTest {
@@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
- lateinit var messagePrioritizationService: MessagePrioritizationService
-
- @Autowired
- lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
@Autowired
- lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+ lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
@Before
fun setup() {
@@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testMessagePrioritizationService() {
runBlocking {
- assertTrue(
- ::messagePrioritizationService.isInitialized,
- "failed to initialize messagePrioritizationService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
log.info("**************** without Correlation **************")
/** Checking without correlation */
@@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService, mockk()
)
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
@@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest {
@Test
fun testSchedulerService() {
runBlocking {
- val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
- assertTrue(
- ::messagePrioritizationSchedulerService.isInitialized,
- "failed to initialize messagePrioritizationSchedulerService"
- )
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val messagePrioritizationService =
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+ messagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationSchedulerService =
+ MessagePrioritizationSchedulerService(messagePrioritizationService)
launch {
- messagePrioritizationSchedulerService.startScheduling(configuration)
+ messagePrioritizationSchedulerService.startScheduling()
}
launch {
/** To debug increase the delay time */
@@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest {
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
- fun testMessagePrioritizationConsumer() {
+ fun testKafkaMessagePrioritizationConsumer() {
runBlocking {
- messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ val kafkaMessagePrioritizationService =
+ SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+ kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+ val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+ messagePrioritizationStateService,
+ kafkaMessagePrioritizationService
+ )
+
+ // Register the processor
+ BluePrintDependencyService.registerSingleton(
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+ defaultMessagePrioritizeProcessor
+ )
+
+ val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService,
+ kafkaMessagePrioritizationService
+ )
+ messagePrioritizationConsumer.startConsuming(configuration)
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
@@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest {
messagePrioritizationConsumer.shutDown()
}
}
+
+ /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+ * Start :
+ * nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+ * */
+ // @Test
+ fun testNatsMessagePrioritizationConsumer() {
+ runBlocking {
+ val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+ assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+ val inputSubject =
+ NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+ val natsMessagePrioritizationService =
+ SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+ natsMessagePrioritizationService.setConfiguration(configuration)
+
+ val messagePrioritizationConsumer =
+ NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+ messagePrioritizationConsumer.startConsuming()
+
+ /** Send sample message with every 1 sec */
+ val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+ launch {
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(100)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(200)
+ bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+ }
+ }
+ delay(3000)
+ messagePrioritizationConsumer.shutDown()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 0285079ad..22c399608 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -17,10 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
@@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
@Service
open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
SampleMessagePrioritizationService(messagePrioritizationStateService)
-
-/** For Kafka Consumer **/
-@Service
-open class TestMessagePrioritizationConsumer(
- bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
-) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
-
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor(
- messagePrioritizationStateService: MessagePrioritizationStateService,
- messagePrioritizationService: MessagePrioritizationService
-) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt
index df3bde1b4..e8457283e 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.springframework.context.ApplicationContext
+import org.springframework.context.ConfigurableApplicationContext
import kotlin.reflect.KClass
/**
@@ -36,6 +37,14 @@ object BluePrintDependencyService {
BluePrintDependencyService.applicationContext = applicationContext
}
+ /** Used to inject [instance] into spring application context for the [key],
+ * Use this method only for testing
+ * */
+ fun registerSingleton(key: String, instance: Any) {
+ val configurableApplicationContext = applicationContext as ConfigurableApplicationContext
+ configurableApplicationContext.beanFactory.registerSingleton(key, instance)
+ }
+
inline fun <reified T> instance(name: String): T {
return applicationContext.getBean(name) as? T
?: throw BluePrintProcessorException("failed to get instance($name)")
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
index b52cd711b..7fe955b03 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
@@ -27,6 +27,10 @@ object ClusterUtils {
return ip.hostName
}
+ fun applicationName(): String {
+ return BluePrintConstants.APP_NAME
+ }
+
fun clusterId(): String {
return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
index 147d360ba..8d5d846d7 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
@@ -41,6 +41,7 @@ class NatsLibConstants {
const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
const val DEFULT_NATS_SELECTOR = "cds-controller"
const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
+ const val PROPERTY_NATS_CLUSTER_ID = "NATS_CLUSTER_ID"
const val TYPE_TOKEN_AUTH = "token-auth"
const val TYPE_TLS_AUTH = "tls-auth"
}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
index 9767ac29d..74897f322 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
@@ -16,11 +16,12 @@
package org.onap.ccsdk.cds.blueprintsprocessor.nats
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
open class NatsConnectionProperties {
lateinit var type: String
- var clusterId: String = ClusterUtils.clusterId()
+ var clusterId: String = NatsClusterUtils.clusterId()
var clientId: String = ClusterUtils.clusterNodeId()
lateinit var host: String
/** Rest endpoint selector to access Monitoring API */
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
index faf171528..18d0639f1 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
@@ -50,15 +50,13 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!!
}
else -> {
- throw BluePrintProcessorException("Nats type($type) not supported")
+ throw BluePrintProcessorException("NATS type($type) not supported")
}
}
}
fun natsConnectionProperties(prefix: String): NatsConnectionProperties {
- val type = bluePrintPropertiesService.propertyBeanType(
- "$prefix.type", String::class.java
- )
+ val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
NatsLibConstants.TYPE_TOKEN_AUTH -> {
tokenAuthNatsConnectionProperties(prefix)
@@ -67,7 +65,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
tlsAuthNatsConnectionProperties(prefix)
}
else -> {
- throw BluePrintProcessorException("Grpc type($type) not supported")
+ throw BluePrintProcessorException("NATS type($type) not supported")
}
}
}
@@ -90,7 +88,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
TLSAuthNatsService(natsConnectionProperties)
}
else -> {
- throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties")
+ throw BluePrintProcessorException("couldn't get NATS service for properties $natsConnectionProperties")
}
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
index 60b7934ba..43a43bc03 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
@@ -21,15 +21,23 @@ import io.nats.client.Options
import io.nats.streaming.NatsStreaming
import io.nats.streaming.StreamingConnection
import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) :
BluePrintNatsService {
+ private val log = logger(TokenAuthNatsService::class)
+
lateinit var streamingConnection: StreamingConnection
override suspend fun connection(): StreamingConnection {
if (!::streamingConnection.isInitialized) {
+ log.info(
+ "NATS connection requesting for cluster(${natsConnectionProperties.clusterId}) with" +
+ "clientId($natsConnectionProperties.clientId)"
+ )
+
val serverList = natsConnectionProperties.host.splitCommaAsList()
val options = Options.Builder()
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt
new file mode 100644
index 000000000..a7726a14b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+object NatsClusterUtils {
+
+ fun clusterId(): String {
+ return System.getenv(NatsLibConstants.PROPERTY_NATS_CLUSTER_ID)
+ ?: ClusterUtils.clusterId()
+ }
+
+ fun applicationSubject(appName: String, subject: String): String {
+ return "$appName.$subject"
+ }
+
+ fun currentApplicationSubject(subject: String): String {
+ return "${BluePrintConstants.APP_NAME}.$subject"
+ }
+
+ fun currentNodeDurable(subject: String): String {
+ return "${ClusterUtils.clusterNodeId()}-$subject"
+ }
+
+ fun applicationLoadBalanceGroup(): String {
+ return "${BluePrintConstants.APP_NAME}"
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt
new file mode 100644
index 000000000..ba993d96a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertTrue
+
+@RunWith(SpringRunner::class)
+@ContextConfiguration(
+ classes = [BluePrintNatsLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ ["blueprintsprocessor.nats.cds-controller.type=token-auth",
+ "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+ "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
+ ]
+)
+class BluePrintNatsLibPropertyServiceTest {
+
+ @Autowired
+ lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
+
+ @Test
+ fun testNatsProperties() {
+ assertTrue(::bluePrintNatsLibPropertyService.isInitialized)
+ bluePrintNatsLibPropertyService.bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR)
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
index 549be6481..721828ac9 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -84,7 +84,7 @@ class BluePrintNatsServiceTest {
* Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
*/
// @Test
- fun localTntegrationTest() {
+ fun localIntegrationTest() {
runBlocking {
val connectionProperties = TokenAuthNatsConnectionProperties().apply {