diff options
author | Brinda Santh <bs2796@att.com> | 2020-01-07 14:56:53 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2020-01-07 14:59:07 -0500 |
commit | 0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 (patch) | |
tree | 527670a53e5e4cd348a2d6cecca062306130834c /ms/blueprintsprocessor/modules/commons | |
parent | 5e5718c9191f125ad65bd9ab15334147bedd79cc (diff) |
Include correlationId in group lock.
Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked.
Optimized Atomix Junit Test cases.
Issue-ID: CCSDK-2011
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
2 files changed, 37 insertions, 14 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index a2a0d3902..9be15f2e3 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.AtomicLock import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol @@ -93,10 +94,21 @@ object AtomixLibUtils { val protocol = MultiPrimaryProtocol.builder() .withBackups(numBackups) .build() + return atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + } + + /** get Atomic distributed lock, to get lock fence information */ + fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() - val lock = atomix.lockBuilder(lockName) + return atomix.atomicLockBuilder(lockName) .withProtocol(protocol) .build() - return lock } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 39453fc7a..67bf4cabb 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -35,7 +35,7 @@ import kotlin.test.assertNotNull import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { - val log = logger(AtomixBluePrintClusterServiceTest::class) + private val log = logger(AtomixBluePrintClusterServiceTest::class) @Before fun init() { @@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest { @Test fun testClusterJoin() { runBlocking { - val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680)) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682)) - val bluePrintClusterService = bluePrintClusterServiceOne.get(0) + val bluePrintClusterServiceOne = + createCluster(arrayListOf(5679, 5680)).toMutableList() + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680)) + // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + val bluePrintClusterService = bluePrintClusterServiceOne[0] log.info("Members : ${bluePrintClusterService.allMembers()}") log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") @@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest { } } - private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> { + private suspend fun createCluster( + ports: List<Int>, + otherClusterPorts: List<Int>? = null + ): List<BluePrintClusterService> { + return withContext(Dispatchers.Default) { - val members = ports.map { "node-$it" } + val clusterMembers = ports.map { "node-$it" }.toMutableList() + /** Add the other cluster as members */ + if (!otherClusterPorts.isNullOrEmpty()) { + val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList() + clusterMembers.addAll(otherClusterMembers) + } val deferred = ports.map { port -> async(Dispatchers.IO) { val nodeId = "node-$port" log.info("********** Starting node($nodeId) on port($port)") val clusterInfo = ClusterInfo( id = "test-cluster", nodeId = nodeId, - clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster" + clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster" ) val atomixClusterService = AtomixBluePrintClusterService() atomixClusterService.startCluster(clusterInfo) @@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest { private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { /** Test Distributed store creation */ repeat(2) { storeId -> - val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>( + val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>( "blueprint-runtime-$storeId" ).toDistributedMap() assertNotNull(store, "failed to get store") - val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>( + val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>( "blueprint-runtime-$storeId" ).toDistributedMap() @@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(bluePrintClusterServices.get(0), "first", lockName) + executeLock(bluePrintClusterServices[0], "first", lockName) } val deferred2 = async { - executeLock(bluePrintClusterServices.get(0), "second", lockName) + executeLock(bluePrintClusterServices[0], "second", lockName) } val deferred3 = async { - executeLock(bluePrintClusterServices.get(0), "third", lockName) + executeLock(bluePrintClusterServices[1], "third", lockName) } deferred.start() deferred2.start() |