aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-07 14:56:53 -0500
committerBrinda Santh <bs2796@att.com>2020-01-07 14:59:07 -0500
commit0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 (patch)
tree527670a53e5e4cd348a2d6cecca062306130834c /ms/blueprintsprocessor
parent5e5718c9191f125ad65bd9ab15334147bedd79cc (diff)
Include correlationId in group lock.
Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked. Optimized Atomix Junit Test cases. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml64
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml36
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt14
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt16
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt35
6 files changed, 144 insertions, 30 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index a37089f10..020038c26 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -45,8 +47,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: controller-config
environment:
# Same as hostname and container name
CLUSTER_ENABLED: "true"
@@ -79,8 +81,8 @@ services:
type: volume
source: blueprints-deploy
- target: /opt/app/onap/config
- type: bind
- source: ./config
+ type: volume
+ source: resource-resolution-config
environment:
CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
@@ -94,8 +96,60 @@ services:
APP_CONFIG_HOME: /opt/app/onap/config
STICKYSELECTORKEY:
ENVCONTEXT: dev
+ py-executor-0:
+ depends_on:
+ - db
+ - nats
+ image: onap/ccsdk-py-executor
+ container_name: py-executor-0
+ hostname: py-executor-0
+ networks:
+ - cds-network
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ environment:
+ CLUSTER_ID: cds-cluster
+ CLUSTER_NODE_ID: py-executor-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+ NATS_HOSTS: nats://nats:4222
+ APPLICATIONNAME: py-executor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ AUTH_TYPE: tls-auth
+ LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
+ resource-resolution-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/resource-resolution/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index d87770286..20b17bc90 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -9,7 +9,9 @@ services:
ports:
- "3306:3306"
volumes:
- - ~/vm_mysql:/var/lib/mysql
+ - target: /var/lib/mysql
+ type: volume
+ source: mysql-data
restart: always
environment:
MYSQL_ROOT_PASSWORD: sdnctl
@@ -29,7 +31,12 @@ services:
- "9111:9111"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
+ - target: /opt/app/onap/config
+ type: volume
+ source: controller-config
environment:
APPLICATIONNAME: cds-controller
BUNDLEVERSION: 1.0.0
@@ -47,7 +54,9 @@ services:
- "50051:50051"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
py-executor-default:
depends_on:
- db
@@ -60,7 +69,9 @@ services:
- "50052:50052"
restart: always
volumes:
- - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ - target: /opt/app/onap/blueprints/deploy
+ type: volume
+ source: blueprints-deploy
environment:
APPLICATIONNAME: py-executor
BUNDLEVERSION: 1.0.0
@@ -72,7 +83,24 @@ services:
LOG_FILE: /opt/app/onap/logs/application.log
volumes:
+ mysql-data:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/mysql/data
+ o: bind
blueprints-deploy:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/blueprints/deploy
+ o: bind
+ controller-config:
+ driver: local
+ driver_opts:
+ type: none
+ device: /opt/app/cds/cds-controller/config
+ o: bind
networks:
cds-network:
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 39d081455..bef7a7b61 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -36,14 +36,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() =
/**
* MessagePrioritization correlation extensions
*/
+
+/**
+ * Arrange comma separated correlation keys in ascending order.
+ */
fun MessagePrioritization.toFormatedCorrelation(): String {
- val ascendingKey = this.correlationId!!.split(",")
+ return this.correlationId!!.split(",")
.map { it.trim() }.sorted().joinToString(",")
- return ascendingKey
}
+/**
+ * Used to group the correlation with respect to types.
+ */
fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
- val ascendingKey = this.correlationId!!.split(",")
- .map { it.trim() }.sorted().joinToString(",")
- return TypeCorrelationKey(this.type, ascendingKey)
+ return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
index d1f38f4f2..49230b6e4 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -22,6 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -32,8 +33,12 @@ object MessageProcessorUtils {
clusterService: BluePrintClusterService?,
messagePrioritization: MessagePrioritization
): ClusterLock? {
- return if (clusterService != null && clusterService.clusterJoined()) {
- val lockName = "prioritization-${messagePrioritization.group}"
+ return if (clusterService != null && clusterService.clusterJoined() &&
+ !messagePrioritization.correlationId.isNullOrBlank()
+ ) {
+ // Get the correlation key in ascending order, even it it is misplaced
+ val correlationId = messagePrioritization.toFormatedCorrelation()
+ val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index a2a0d3902..9be15f2e3 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -93,10 +94,21 @@ object AtomixLibUtils {
val protocol = MultiPrimaryProtocol.builder()
.withBackups(numBackups)
.build()
+ return atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ }
+
+ /** get Atomic distributed lock, to get lock fence information */
+ fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
- val lock = atomix.lockBuilder(lockName)
+ return atomix.atomicLockBuilder(lockName)
.withProtocol(protocol)
.build()
- return lock
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 39453fc7a..67bf4cabb 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
- val log = logger(AtomixBluePrintClusterServiceTest::class)
+ private val log = logger(AtomixBluePrintClusterServiceTest::class)
@Before
fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
@Test
fun testClusterJoin() {
runBlocking {
- val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
- val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680)).toMutableList()
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ val bluePrintClusterService = bluePrintClusterServiceOne[0]
log.info("Members : ${bluePrintClusterService.allMembers()}")
log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ private suspend fun createCluster(
+ ports: List<Int>,
+ otherClusterPorts: List<Int>? = null
+ ): List<BluePrintClusterService> {
+
return withContext(Dispatchers.Default) {
- val members = ports.map { "node-$it" }
+ val clusterMembers = ports.map { "node-$it" }.toMutableList()
+ /** Add the other cluster as members */
+ if (!otherClusterPorts.isNullOrEmpty()) {
+ val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+ clusterMembers.addAll(otherClusterMembers)
+ }
val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
val clusterInfo = ClusterInfo(
id = "test-cluster", nodeId = nodeId,
- clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices[0], "first", lockName)
}
val deferred2 = async {
- executeLock(bluePrintClusterServices.get(0), "second", lockName)
+ executeLock(bluePrintClusterServices[0], "second", lockName)
}
val deferred3 = async {
- executeLock(bluePrintClusterServices.get(0), "third", lockName)
+ executeLock(bluePrintClusterServices[1], "third", lockName)
}
deferred.start()
deferred2.start()