aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/atomix-lib/src
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-01-07 14:56:53 -0500
committerBrinda Santh <bs2796@att.com>2020-01-07 14:59:07 -0500
commit0e8d0fc7b44a17a558f88642e2e7a4de007a3da4 (patch)
tree527670a53e5e4cd348a2d6cecca062306130834c /ms/blueprintsprocessor/modules/commons/atomix-lib/src
parent5e5718c9191f125ad65bd9ab15334147bedd79cc (diff)
Include correlationId in group lock.
Message prioritization optimization by checking and including correlation Id, so that non related correlation message won't get locked. Optimized Atomix Junit Test cases. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I090ed4c193c7f9af4014cfeee4c6208c12b542c1
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt16
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt35
2 files changed, 37 insertions, 14 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index a2a0d3902..9be15f2e3 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -93,10 +94,21 @@ object AtomixLibUtils {
val protocol = MultiPrimaryProtocol.builder()
.withBackups(numBackups)
.build()
+ return atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ }
+
+ /** get Atomic distributed lock, to get lock fence information */
+ fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
- val lock = atomix.lockBuilder(lockName)
+ return atomix.atomicLockBuilder(lockName)
.withProtocol(protocol)
.build()
- return lock
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 39453fc7a..67bf4cabb 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
- val log = logger(AtomixBluePrintClusterServiceTest::class)
+ private val log = logger(AtomixBluePrintClusterServiceTest::class)
@Before
fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
@Test
fun testClusterJoin() {
runBlocking {
- val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
- val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680)).toMutableList()
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ val bluePrintClusterService = bluePrintClusterServiceOne[0]
log.info("Members : ${bluePrintClusterService.allMembers()}")
log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ private suspend fun createCluster(
+ ports: List<Int>,
+ otherClusterPorts: List<Int>? = null
+ ): List<BluePrintClusterService> {
+
return withContext(Dispatchers.Default) {
- val members = ports.map { "node-$it" }
+ val clusterMembers = ports.map { "node-$it" }.toMutableList()
+ /** Add the other cluster as members */
+ if (!otherClusterPorts.isNullOrEmpty()) {
+ val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+ clusterMembers.addAll(otherClusterMembers)
+ }
val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
val clusterInfo = ClusterInfo(
id = "test-cluster", nodeId = nodeId,
- clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices[0], "first", lockName)
}
val deferred2 = async {
- executeLock(bluePrintClusterServices.get(0), "second", lockName)
+ executeLock(bluePrintClusterServices[0], "second", lockName)
}
val deferred3 = async {
- executeLock(bluePrintClusterServices.get(0), "third", lockName)
+ executeLock(bluePrintClusterServices[1], "third", lockName)
}
deferred.start()
deferred2.start()