From 0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 7 Jan 2020 14:56:53 -0500 Subject: Include correlationId in group lock. Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked. Optimized Atomix Junit Test cases. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1 --- .../atomix/utils/AtomixLibUtils.kt | 16 ++++++++-- .../atomix/AtomixBluePrintClusterServiceTest.kt | 35 ++++++++++++++-------- 2 files changed, 37 insertions(+), 14 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib') diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index a2a0d3902..9be15f2e3 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.AtomicLock import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol @@ -90,13 +91,24 @@ object AtomixLibUtils { fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() + return atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + } + + /** get Atomic distributed lock, to get lock fence information */ + fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" } + val protocol = MultiPrimaryProtocol.builder() .withBackups(numBackups) .build() - val lock = atomix.lockBuilder(lockName) + return atomix.atomicLockBuilder(lockName) .withProtocol(protocol) .build() - return lock } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 39453fc7a..67bf4cabb 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -35,7 +35,7 @@ import kotlin.test.assertNotNull import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { - val log = logger(AtomixBluePrintClusterServiceTest::class) + private val log = logger(AtomixBluePrintClusterServiceTest::class) @Before fun init() { @@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest { @Test fun testClusterJoin() { runBlocking { - val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680)) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682)) - val bluePrintClusterService = bluePrintClusterServiceOne.get(0) + val bluePrintClusterServiceOne = + createCluster(arrayListOf(5679, 5680)).toMutableList() + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680)) + // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + val bluePrintClusterService = bluePrintClusterServiceOne[0] log.info("Members : ${bluePrintClusterService.allMembers()}") log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") @@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest { } } - private suspend fun createCluster(ports: List): List { + private suspend fun createCluster( + ports: List, + otherClusterPorts: List? = null + ): List { + return withContext(Dispatchers.Default) { - val members = ports.map { "node-$it" } + val clusterMembers = ports.map { "node-$it" }.toMutableList() + /** Add the other cluster as members */ + if (!otherClusterPorts.isNullOrEmpty()) { + val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList() + clusterMembers.addAll(otherClusterMembers) + } val deferred = ports.map { port -> async(Dispatchers.IO) { val nodeId = "node-$port" log.info("********** Starting node($nodeId) on port($port)") val clusterInfo = ClusterInfo( id = "test-cluster", nodeId = nodeId, - clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster" ) val atomixClusterService = AtomixBluePrintClusterService() atomixClusterService.startCluster(clusterInfo) @@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest { private suspend fun testDistributedStore(bluePrintClusterServices: List) { /** Test Distributed store creation */ repeat(2) { storeId -> - val store = bluePrintClusterServices.get(0).clusterMapStore( + val store = bluePrintClusterServices[0].clusterMapStore( "blueprint-runtime-$storeId" ).toDistributedMap() assertNotNull(store, "failed to get store") - val store1 = bluePrintClusterServices.get(0).clusterMapStore( + val store1 = bluePrintClusterServices[1].clusterMapStore( "blueprint-runtime-$storeId" ).toDistributedMap() @@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(bluePrintClusterServices.get(0), "first", lockName) + executeLock(bluePrintClusterServices[0], "first", lockName) } val deferred2 = async { - executeLock(bluePrintClusterServices.get(0), "second", lockName) + executeLock(bluePrintClusterServices[0], "second", lockName) } val deferred3 = async { - executeLock(bluePrintClusterServices.get(0), "third", lockName) + executeLock(bluePrintClusterServices[1], "third", lockName) } deferred.start() deferred2.start() -- cgit 1.2.3-korg