summaryrefslogtreecommitdiffstats
path: root/ms
diff options
context:
space:
mode:
Diffstat (limited to 'ms')
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml64
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml36
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/README.md5
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt3
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt29
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt68
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt15
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt17
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt)27
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt115
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt)38
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt)6
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt)2
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt)164
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt (renamed from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt)58
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt46
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt80
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt34
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt29
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt45
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt16
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt35
24 files changed, 591 insertions, 381 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index a37089f10..020038c26 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -45,8 +47,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: controller-config
environment:
# Same as hostname and container name
CLUSTER_ENABLED: "true"
@@ -79,8 +81,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: resource-resolution-config
environment:
CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
@@ -94,8 +96,60 @@ services:
APP_CONFIG_HOME: /opt/app/onap/config
STICKYSELECTORKEY:
ENVCONTEXT: dev
+ py-executor-0:
+ depends_on:
+ - db
+ - nats
+ image: onap/ccsdk-py-executor
+ container_name: py-executor-0
+ hostname: py-executor-0
+ networks:
+ - cds-network
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ environment:
+ CLUSTER_ID: cds-cluster
+ CLUSTER_NODE_ID: py-executor-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+ NATS_HOSTS: nats://nats:4222
+ APPLICATIONNAME: py-executor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ AUTH_TYPE: tls-auth
+ LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
+ resource-resolution-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/resource-resolution/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index d87770286..20b17bc90 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -29,7 +31,12 @@ services:
- "9111:9111"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ - target: /opt/app/onap/config
+ type: volume
+ source: controller-config
environment:
APPLICATIONNAME: cds-controller
BUNDLEVERSION: 1.0.0
@@ -47,7 +54,9 @@ services:
- "50051:50051"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
py-executor-default:
depends_on:
- db
@@ -60,7 +69,9 @@ services:
- "50052:50052"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
environment:
APPLICATIONNAME: py-executor
BUNDLEVERSION: 1.0.0
@@ -72,7 +83,24 @@ services:
LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
index 482bbc2cc..cda43faca 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/README.md
@@ -3,7 +3,6 @@ To Delete Topics
------------------
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
-kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic
kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
Create Topics
@@ -11,7 +10,6 @@ Create Topics
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
To List topics
----------------
@@ -26,6 +24,3 @@ To Listen for Output
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
-
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
-
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
index 28e096352..890e0a6ba 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
@@ -28,9 +28,6 @@ object MessagePrioritizationConstants {
const val SOURCE_INPUT = "source-prioritization-input"
const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
- const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
- const val PROCESSOR_OUTPUT = "processor-prioritization-output"
const val SINK_OUTPUT = "sink-prioritization-output"
- const val SINK_EXPIRED = "sink-prioritization-expired"
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
new file mode 100644
index 000000000..584fd00d3
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+
+interface MessagePrioritizationService {
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+
+ suspend fun prioritize(messagePrioritization: MessagePrioritization)
+
+ suspend fun output(id: String)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
new file mode 100644
index 000000000..5dd41d7f3
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.Date
+
+interface MessagePrioritizationStateService {
+
+ suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+ suspend fun getMessage(id: String): MessagePrioritization
+
+ suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
+ suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>?
+
+ suspend fun updateMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+ suspend fun setMessageState(id: String, state: String)
+
+ suspend fun setMessagesPriority(ids: List<String>, priority: String)
+
+ suspend fun setMessagesState(ids: List<String>, state: String)
+
+ suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+ suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
+
+ suspend fun deleteMessage(id: String)
+
+ suspend fun deleteMessageByGroup(group: String)
+
+ suspend fun deleteMessageStates(group: String, states: List<String>)
+
+ suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 39d081455..05b820adb 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -17,7 +17,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -36,14 +35,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() =
/**
* MessagePrioritization correlation extensions
*/
+
+/**
+ * Arrange comma separated correlation keys in ascending order.
+ */
fun MessagePrioritization.toFormatedCorrelation(): String {
- val ascendingKey = this.correlationId!!.split(",")
+ return this.correlationId!!.split(",")
.map { it.trim() }.sorted().joinToString(",")
- return ascendingKey
}
+/**
+ * Used to group the correlation with respect to types.
+ */
fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
- val ascendingKey = this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
- return TypeCorrelationKey(this.type, ascendingKey)
+ return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
index 262dcb402..e90771fb8 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -16,9 +16,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
@@ -31,7 +32,10 @@ import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping(value = ["/api/v1/message-prioritization"])
-open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+open class MessagePrioritizationApi(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
@GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
@@ -53,6 +57,15 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic
}
@PostMapping(
+ path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ @ResponseBody
+ fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+ messagePrioritizationService.prioritize(messagePrioritization)
+ }
+
+ @PostMapping(
path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
consumes = [MediaType.APPLICATION_JSON_VALUE]
)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
index 35566abb4..656646ff7 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
@@ -14,38 +14,21 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
private val log = logger(AbstractMessagePrioritizeProcessor::class)
lateinit var prioritizationConfiguration: PrioritizationConfiguration
- lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
- var clusterService: BluePrintClusterService? = null
- override fun init(context: ProcessorContext) {
- this.processorContext = context
- /** Get the State service to update in store */
- this.messagePrioritizationStateService = BluePrintDependencyService
- .messagePrioritizationStateService()
- }
-
- /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
- open fun initializeClusterService() {
- /** Get the Cluster service to update in store */
- if (BluePrintConstants.CLUSTER_ENABLED) {
- this.clusterService = BluePrintDependencyService.clusterService()
- }
+ override fun init(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
new file mode 100644
index 000000000..c14a404ad
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+
+open class DefaultMessagePrioritizeProcessor(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ lateinit var expiryCancellable: Cancellable
+ lateinit var cleanCancellable: Cancellable
+
+ override suspend fun processNB(key: ByteArray, value: ByteArray) {
+
+ val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ try {
+ messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ messagePrioritizationService.prioritize(messagePrioritize)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
+ /** Publish to Output topic */
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ super.init(context)
+ /** set up expiry marking cron */
+ initializeExpiryPunctuator()
+ /** Set up cleaning records cron */
+ initializeCleanPunctuator()
+ }
+
+ override fun close() {
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ expiryCancellable.cancel()
+ cleanCancellable.cancel()
+ }
+
+ open fun initializeExpiryPunctuator() {
+ val expiryPunctuator =
+ MessagePriorityExpiryPunctuator(
+ messagePrioritizationStateService
+ )
+ expiryPunctuator.processorContext = processorContext
+ expiryPunctuator.configuration = prioritizationConfiguration
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ expiryCancellable = processorContext.schedule(
+ Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+ )
+ log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+ }
+
+ open fun initializeCleanPunctuator() {
+ val cleanPunctuator =
+ MessagePriorityCleanPunctuator(
+ messagePrioritizationStateService
+ )
+ cleanPunctuator.processorContext = processorContext
+ cleanPunctuator.configuration = prioritizationConfiguration
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ cleanCancellable = processorContext.schedule(
+ Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+ )
+ log.info(
+ "Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ )
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
index b611060f7..d7666a20b 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
@@ -14,11 +14,12 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -42,7 +43,7 @@ open class MessagePrioritizationConsumer(
}
open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
- KafkaStreamConsumerFunction {
+ KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
override suspend fun createTopology(
@@ -52,7 +53,7 @@ open class MessagePrioritizationConsumer(
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
@@ -68,39 +69,12 @@ open class MessagePrioritizationConsumer(
MessagePrioritizationConstants.SOURCE_INPUT
)
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE
- )
-
- topology.addSink(
- MessagePrioritizationConstants.SINK_EXPIRED,
- prioritizationConfiguration.expiredTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
prioritizationConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- MessagePrioritizationConstants.PROCESSOR_OUTPUT
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
// Output will be sent to the group-output topic from Processor API
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
index 5435ebe30..e27cf16d0 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
@@ -14,13 +14,13 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.To
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -46,7 +46,7 @@ class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateServ
fetchMessages.forEach { expired ->
processorContext.forward(
expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
)
}
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
index f2a481f74..5595863d4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
index 4e4e2da7a..13c0dd7bc 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
@@ -14,43 +14,42 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
- private val log = logger(MessagePrioritizeProcessor::class)
+ private val log = logger(AbstractMessagePrioritizationService::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
+ var processorContext: ProcessorContext? = null
- override suspend fun processNB(key: ByteArray, value: ByteArray) {
- log.info("***** received in prioritize processor key(${String(key)})")
- val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
+ log.info("***** received in prioritize processor key(${messagePrioritize.id})")
/** Get the cluster lock for message group */
- val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
@@ -59,58 +58,16 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
messagePrioritize.id, MessageState.ERROR.name,
messagePrioritize.error!!
)
- /** Publish to Output topic */
- this.processorContext.forward(
- messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
}
}
- override fun init(context: ProcessorContext) {
- super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
- /** Set up Cluster Service */
- initializeClusterService()
- }
-
- override fun close() {
- log.info(
- "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
+ override suspend fun output(id: String) {
+ log.info("$$$$$ received in output processor id($id)")
+ val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -126,10 +83,11 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
)
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
- group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
- )
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+ .getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
@@ -139,12 +97,9 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
if (correlationResults.correlated) {
/** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
- /** Send only correlated ids to next processor */
- this.processorContext.forward(
- UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+ /** Send only correlated ids to aggregate processor */
+ aggregate(correlatedIds)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
@@ -159,16 +114,57 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ aggregate(messagePrioritization.id)
+ }
+ }
+
+ open suspend fun aggregate(strIds: String) {
+ log.info("@@@@@ received in aggregation processor ids($strIds)")
+ val ids = strIds.split(",").map { it.trim() }
+ if (!ids.isNullOrEmpty()) {
+ try {
+ if (ids.size == 1) {
+ /** No aggregation or sequencing needed, simpley forward to next processor */
+ output(ids.first())
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ } catch (e: Exception) {
+ val error = "failed in Aggregate message($ids) : ${e.message}"
+ log.error(error, e)
+ val storeMessages = messagePrioritizationStateService.getMessages(ids)
+ if (!storeMessages.isNullOrEmpty()) {
+ storeMessages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
+ /** Publish to output topic */
+ output(messagePrioritization.id)
+ } catch (sendException: Exception) {
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e
+ )
+ }
+ }
+ }
+ }
}
}
+ /** Child will override this implementation , if necessary
+ * Here the place child has to implement custom Sequencing and Aggregation logic.
+ * */
+ abstract suspend fun handleAggregation(messageIds: List<String>)
+
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */
- open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return null
- }
+ abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
index 017658ff6..d9cd956bf 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
@@ -16,6 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
@@ -27,59 +28,10 @@ import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.Date
-interface MessagePrioritizationStateService {
-
- suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
-
- suspend fun getMessage(id: String): MessagePrioritization
-
- suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
-
- suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
-
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
-
- suspend fun getCorrelatedMessages(
- group: String,
- states: List<String>,
- types: List<String>?,
- correlationIds: String
- ): List<MessagePrioritization>?
-
- suspend fun updateMessagesState(ids: List<String>, state: String)
-
- suspend fun updateMessageState(id: String, state: String): MessagePrioritization
-
- suspend fun setMessageState(id: String, state: String)
-
- suspend fun setMessagesPriority(ids: List<String>, priority: String)
-
- suspend fun setMessagesState(ids: List<String>, state: String)
-
- suspend fun setMessageStateANdError(id: String, state: String, error: String)
-
- suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
-
- suspend fun deleteMessage(id: String)
-
- suspend fun deleteMessageByGroup(group: String)
-
- suspend fun deleteMessageStates(group: String, states: List<String>)
-
- suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
-}
-
@Service
open class MessagePrioritizationStateServiceImpl(
private val prioritizationMessageRepository: PrioritizationMessageRepository
-) :
- MessagePrioritizationStateService {
+) : MessagePrioritizationStateService {
private val log = logger(MessagePrioritizationStateServiceImpl::class)
@@ -110,7 +62,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
@@ -118,7 +70,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
@@ -126,7 +78,7 @@ open class MessagePrioritizationStateServiceImpl(
}
override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByByGroupAndExpiredDate(
group,
expiryDate, PageRequest.of(0, count)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
new file mode 100644
index 000000000..fcdb71cda
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messageIds: List<String>) {
+ log.info("messages($messageIds) aggregated")
+ messageIds.forEach { id ->
+ output(id)
+ }
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ /** Dummy Implementation, This can also be read from file and stored as cached map **/
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ else -> null
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
deleted file mode 100644
index 3e697e633..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageAggregateProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
-
- log.info("@@@@@ received in aggregation processor key($key), value($value)")
- val ids = value.split(",").map { it.trim() }
- if (!ids.isNullOrEmpty()) {
- try {
- if (ids.size == 1) {
- processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
- }
- } catch (e: Exception) {
- val error = "failed in Aggregate message($ids) : ${e.message}"
- log.error(error, e)
- val storeMessages = messagePrioritizationStateService.getMessages(ids)
- if (!storeMessages.isNullOrEmpty()) {
- storeMessages.forEach { messagePrioritization ->
- try {
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritization.id,
- MessageState.ERROR.name, error
- )
- /** Publish to Error topic */
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- } catch (sendException: Exception) {
- log.error(
- "failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}", e
- )
- }
- }
- }
- }
- }
- }
-
- /** Child will override this implementation , if necessary */
- open suspend fun handleAggregation(messageIds: List<String>) {
- log.info("messages($messageIds) aggregated")
- messageIds.forEach { id ->
- processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- }
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
deleted file mode 100644
index cf6520df5..000000000
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageOutputProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
- log.info("$$$$$ received in output processor key($key), value($value)")
- val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
- processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index d1f38f4f2..186499d66 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -17,23 +17,27 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
- /** Utility to create the cluster lock for message [messagePrioritization] */
- suspend fun prioritizationGrouplock(
- clusterService: BluePrintClusterService?,
- messagePrioritization: MessagePrioritization
- ): ClusterLock? {
- return if (clusterService != null && clusterService.clusterJoined()) {
- val lockName = "prioritization-${messagePrioritization.group}"
+ /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
+ suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+
+ return if (clusterService != null && clusterService.clusterJoined() &&
+ !messagePrioritization.correlationId.isNullOrBlank()
+ ) {
+ // Get the correlation key in ascending order, even it it is misplaced
+ val correlationId = messagePrioritization.toFormatedCorrelation()
+ val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
@@ -41,14 +45,15 @@ object MessageProcessorUtils {
} else null
}
- /** Utility used to cluster unlock for message [messagePrioritization] */
- suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
- if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ /** Utility used to cluster unlock for message [clusterLock] */
+ suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+ if (clusterLock != null) {
clusterLock.unLock()
clusterLock.close()
}
}
+ /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index f9e23e826..ec0515c42 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -27,12 +27,13 @@ import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
@@ -43,6 +44,7 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
@@ -72,6 +74,8 @@ import kotlin.test.assertNotNull
)
open class MessagePrioritizationConsumerTest {
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
@Autowired
lateinit var applicationContext: ApplicationContext
@@ -82,6 +86,9 @@ open class MessagePrioritizationConsumerTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Autowired
+ lateinit var messagePrioritizationService: MessagePrioritizationService
+
+ @Autowired
lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
@Before
@@ -107,6 +114,38 @@ open class MessagePrioritizationConsumerTest {
}
@Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ assertTrue(
+ ::messagePrioritizationService.isInitialized,
+ "failed to initialize messagePrioritizationService"
+ )
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
+ @Test
fun testStartConsuming() {
runBlocking {
val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
@@ -118,7 +157,9 @@ open class MessagePrioritizationConsumerTest {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService
+ )
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
index 3d3d0c6f5..0285079ad 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
@@ -17,10 +17,9 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
@@ -65,22 +64,17 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
*/
@Service
-open class SampleMessagePrioritizationConsumer(
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+
+/** For Kafka Consumer **/
+@Service
+open class TestMessagePrioritizationConsumer(
bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return when (messagePrioritization.group) {
- "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
- else -> null
- }
- }
-}
-
-@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
-
-@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class SampleMessageOutputProcessor : MessageOutputProcessor()
+open class TestMessagePrioritizeProcessor(
+ messagePrioritizationStateService: MessagePrioritizationStateService,
+ messagePrioritizationService: MessagePrioritizationService
+) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
index 8ea15935f..8ef290303 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.context.annotation.Configuration
@@ -29,3 +30,10 @@ open class BluePrintAtomixLibConfiguration
*/
fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
instance(AtomixBluePrintClusterService::class)
+
+/** Optional Cluster Service, returns only if Cluster is enabled */
+fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
+ return if (BluePrintConstants.CLUSTER_ENABLED) {
+ BluePrintDependencyService.clusterService()
+ } else null
+}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index a2a0d3902..9be15f2e3 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -93,10 +94,21 @@ object AtomixLibUtils {
val protocol = MultiPrimaryProtocol.builder()
.withBackups(numBackups)
.build()
+ return atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ }
+
+ /** get Atomic distributed lock, to get lock fence information */
+ fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
- val lock = atomix.lockBuilder(lockName)
+ return atomix.atomicLockBuilder(lockName)
.withProtocol(protocol)
.build()
- return lock
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 39453fc7a..67bf4cabb 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
- val log = logger(AtomixBluePrintClusterServiceTest::class)
+ private val log = logger(AtomixBluePrintClusterServiceTest::class)
@Before
fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
@Test
fun testClusterJoin() {
runBlocking {
- val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
- val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680)).toMutableList()
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ val bluePrintClusterService = bluePrintClusterServiceOne[0]
log.info("Members : ${bluePrintClusterService.allMembers()}")
log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ private suspend fun createCluster(
+ ports: List<Int>,
+ otherClusterPorts: List<Int>? = null
+ ): List<BluePrintClusterService> {
+
return withContext(Dispatchers.Default) {
- val members = ports.map { "node-$it" }
+ val clusterMembers = ports.map { "node-$it" }.toMutableList()
+ /** Add the other cluster as members */
+ if (!otherClusterPorts.isNullOrEmpty()) {
+ val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+ clusterMembers.addAll(otherClusterMembers)
+ }
val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
val clusterInfo = ClusterInfo(
id = "test-cluster", nodeId = nodeId,
- clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices[0], "first", lockName)
}
val deferred2 = async {
- executeLock(bluePrintClusterServices.get(0), "second", lockName)
+ executeLock(bluePrintClusterServices[0], "second", lockName)
}
val deferred3 = async {
- executeLock(bluePrintClusterServices.get(0), "third", lockName)
+ executeLock(bluePrintClusterServices[1], "third", lockName)
}
deferred.start()
deferred2.start()