From 0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 7 Jan 2020 14:56:53 -0500 Subject: Include correlationId in group lock. Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked. Optimized Atomix Junit Test cases. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1 --- .../src/main/dc/docker-compose-cluster.yaml | 64 ++++++++++++++++++++-- .../application/src/main/dc/docker-compose.yaml | 36 ++++++++++-- .../prioritization/MessagePrioritizeExtensions.kt | 14 +++-- .../prioritization/utils/MessageProcessorUtils.kt | 9 ++- .../atomix/utils/AtomixLibUtils.kt | 16 +++++- .../atomix/AtomixBluePrintClusterServiceTest.kt | 35 ++++++++---- 6 files changed, 144 insertions(+), 30 deletions(-) diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml index a37089f10..020038c26 100644 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml @@ -9,7 +9,9 @@ services: ports: - "3306:3306" volumes: - - ~/vm_mysql:/var/lib/mysql + - target: /var/lib/mysql + type: volume + source: mysql-data restart: always environment: MYSQL_ROOT_PASSWORD: sdnctl @@ -45,8 +47,8 @@ services: type: volume source: blueprints-deploy - target: /opt/app/onap/config - type: bind - source: ./config + type: volume + source: controller-config environment: # Same as hostname and container name CLUSTER_ENABLED: "true" @@ -79,8 +81,8 @@ services: type: volume source: blueprints-deploy - target: /opt/app/onap/config - type: bind - source: ./config + type: volume + source: resource-resolution-config environment: CLUSTER_ENABLED: "true" CLUSTER_ID: cds-cluster @@ -94,8 +96,60 @@ services: APP_CONFIG_HOME: /opt/app/onap/config STICKYSELECTORKEY: ENVCONTEXT: dev + py-executor-0: + depends_on: + - db + - nats + image: onap/ccsdk-py-executor + container_name: py-executor-0 + hostname: py-executor-0 + networks: + - cds-network + ports: + - "50052:50052" + restart: always + volumes: + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy + environment: + CLUSTER_ID: cds-cluster + CLUSTER_NODE_ID: py-executor-0 + CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0 + NATS_HOSTS: nats://nats:4222 + APPLICATIONNAME: py-executor + BUNDLEVERSION: 1.0.0 + APP_CONFIG_HOME: /opt/app/onap/config + STICKYSELECTORKEY: + ENVCONTEXT: dev + APP_PORT: 50052 + AUTH_TYPE: tls-auth + LOG_FILE: /opt/app/onap/logs/application.log volumes: + mysql-data: + driver: local + driver_opts: + type: none + device: /opt/app/cds/mysql/data + o: bind blueprints-deploy: + driver: local + driver_opts: + type: none + device: /opt/app/cds/blueprints/deploy + o: bind + controller-config: + driver: local + driver_opts: + type: none + device: /opt/app/cds/cds-controller/config + o: bind + resource-resolution-config: + driver: local + driver_opts: + type: none + device: /opt/app/cds/resource-resolution/config + o: bind networks: cds-network: diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml index d87770286..20b17bc90 100755 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml @@ -9,7 +9,9 @@ services: ports: - "3306:3306" volumes: - - ~/vm_mysql:/var/lib/mysql + - target: /var/lib/mysql + type: volume + source: mysql-data restart: always environment: MYSQL_ROOT_PASSWORD: sdnctl @@ -29,7 +31,12 @@ services: - "9111:9111" restart: always volumes: - - blueprints-deploy:/opt/app/onap/blueprints/deploy + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy + - target: /opt/app/onap/config + type: volume + source: controller-config environment: APPLICATIONNAME: cds-controller BUNDLEVERSION: 1.0.0 @@ -47,7 +54,9 @@ services: - "50051:50051" restart: always volumes: - - blueprints-deploy:/opt/app/onap/blueprints/deploy + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy py-executor-default: depends_on: - db @@ -60,7 +69,9 @@ services: - "50052:50052" restart: always volumes: - - blueprints-deploy:/opt/app/onap/blueprints/deploy + - target: /opt/app/onap/blueprints/deploy + type: volume + source: blueprints-deploy environment: APPLICATIONNAME: py-executor BUNDLEVERSION: 1.0.0 @@ -72,7 +83,24 @@ services: LOG_FILE: /opt/app/onap/logs/application.log volumes: + mysql-data: + driver: local + driver_opts: + type: none + device: /opt/app/cds/mysql/data + o: bind blueprints-deploy: + driver: local + driver_opts: + type: none + device: /opt/app/cds/blueprints/deploy + o: bind + controller-config: + driver: local + driver_opts: + type: none + device: /opt/app/cds/cds-controller/config + o: bind networks: cds-network: diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt index 39d081455..bef7a7b61 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt @@ -36,14 +36,18 @@ fun AbstractComponentFunction.messagePrioritizationStateService() = /** * MessagePrioritization correlation extensions */ + +/** + * Arrange comma separated correlation keys in ascending order. + */ fun MessagePrioritization.toFormatedCorrelation(): String { - val ascendingKey = this.correlationId!!.split(",") + return this.correlationId!!.split(",") .map { it.trim() }.sorted().joinToString(",") - return ascendingKey } +/** + * Used to group the correlation with respect to types. + */ fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey { - val ascendingKey = this.correlationId!!.split(",") - .map { it.trim() }.sorted().joinToString(",") - return TypeCorrelationKey(this.type, ascendingKey) + return TypeCorrelationKey(this.type, this.toFormatedCorrelation()) } diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt index d1f38f4f2..49230b6e4 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt @@ -22,6 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization +import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -32,8 +33,12 @@ object MessageProcessorUtils { clusterService: BluePrintClusterService?, messagePrioritization: MessagePrioritization ): ClusterLock? { - return if (clusterService != null && clusterService.clusterJoined()) { - val lockName = "prioritization-${messagePrioritization.group}" + return if (clusterService != null && clusterService.clusterJoined() && + !messagePrioritization.correlationId.isNullOrBlank() + ) { + // Get the correlation key in ascending order, even it it is misplaced + val correlationId = messagePrioritization.toFormatedCorrelation() + val lockName = "prioritization-${messagePrioritization.group}-$correlationId" val clusterLock = clusterService.clusterLock(lockName) clusterLock.lock() if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)") diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index a2a0d3902..9be15f2e3 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.AtomicLock import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol @@ -90,13 +91,24 @@ object AtomixLibUtils { fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() + return atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + } + + /** get Atomic distributed lock, to get lock fence information */ + fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" } + val protocol = MultiPrimaryProtocol.builder() .withBackups(numBackups) .build() - val lock = atomix.lockBuilder(lockName) + return atomix.atomicLockBuilder(lockName) .withProtocol(protocol) .build() - return lock } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 39453fc7a..67bf4cabb 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -35,7 +35,7 @@ import kotlin.test.assertNotNull import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { - val log = logger(AtomixBluePrintClusterServiceTest::class) + private val log = logger(AtomixBluePrintClusterServiceTest::class) @Before fun init() { @@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest { @Test fun testClusterJoin() { runBlocking { - val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680)) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682)) - val bluePrintClusterService = bluePrintClusterServiceOne.get(0) + val bluePrintClusterServiceOne = + createCluster(arrayListOf(5679, 5680)).toMutableList() + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680)) + // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + val bluePrintClusterService = bluePrintClusterServiceOne[0] log.info("Members : ${bluePrintClusterService.allMembers()}") log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") @@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest { } } - private suspend fun createCluster(ports: List): List { + private suspend fun createCluster( + ports: List, + otherClusterPorts: List? = null + ): List { + return withContext(Dispatchers.Default) { - val members = ports.map { "node-$it" } + val clusterMembers = ports.map { "node-$it" }.toMutableList() + /** Add the other cluster as members */ + if (!otherClusterPorts.isNullOrEmpty()) { + val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList() + clusterMembers.addAll(otherClusterMembers) + } val deferred = ports.map { port -> async(Dispatchers.IO) { val nodeId = "node-$port" log.info("********** Starting node($nodeId) on port($port)") val clusterInfo = ClusterInfo( id = "test-cluster", nodeId = nodeId, - clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster" ) val atomixClusterService = AtomixBluePrintClusterService() atomixClusterService.startCluster(clusterInfo) @@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest { private suspend fun testDistributedStore(bluePrintClusterServices: List) { /** Test Distributed store creation */ repeat(2) { storeId -> - val store = bluePrintClusterServices.get(0).clusterMapStore( + val store = bluePrintClusterServices[0].clusterMapStore( "blueprint-runtime-$storeId" ).toDistributedMap() assertNotNull(store, "failed to get store") - val store1 = bluePrintClusterServices.get(0).clusterMapStore( + val store1 = bluePrintClusterServices[1].clusterMapStore( "blueprint-runtime-$storeId" ).toDistributedMap() @@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(bluePrintClusterServices.get(0), "first", lockName) + executeLock(bluePrintClusterServices[0], "first", lockName) } val deferred2 = async { - executeLock(bluePrintClusterServices.get(0), "second", lockName) + executeLock(bluePrintClusterServices[0], "second", lockName) } val deferred3 = async { - executeLock(bluePrintClusterServices.get(0), "third", lockName) + executeLock(bluePrintClusterServices[1], "third", lockName) } deferred.start() deferred2.start() -- cgit 1.2.3-korg