summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt16
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt35
2 files changed, 37 insertions, 14 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index a2a0d3902..9be15f2e3 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.AtomicLock
import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
@@ -93,10 +94,21 @@ object AtomixLibUtils {
val protocol = MultiPrimaryProtocol.builder()
.withBackups(numBackups)
.build()
+ return atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ }
+
+ /** get Atomic distributed lock, to get lock fence information */
+ fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
- val lock = atomix.lockBuilder(lockName)
+ return atomix.atomicLockBuilder(lockName)
.withProtocol(protocol)
.build()
- return lock
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 39453fc7a..67bf4cabb 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -35,7 +35,7 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
- val log = logger(AtomixBluePrintClusterServiceTest::class)
+ private val log = logger(AtomixBluePrintClusterServiceTest::class)
@Before
fun init() {
@@ -48,9 +48,11 @@ class AtomixBluePrintClusterServiceTest {
@Test
fun testClusterJoin() {
runBlocking {
- val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
- val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680)).toMutableList()
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ val bluePrintClusterService = bluePrintClusterServiceOne[0]
log.info("Members : ${bluePrintClusterService.allMembers()}")
log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
@@ -59,16 +61,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ private suspend fun createCluster(
+ ports: List<Int>,
+ otherClusterPorts: List<Int>? = null
+ ): List<BluePrintClusterService> {
+
return withContext(Dispatchers.Default) {
- val members = ports.map { "node-$it" }
+ val clusterMembers = ports.map { "node-$it" }.toMutableList()
+ /** Add the other cluster as members */
+ if (!otherClusterPorts.isNullOrEmpty()) {
+ val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
+ clusterMembers.addAll(otherClusterMembers)
+ }
val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
val clusterInfo = ClusterInfo(
id = "test-cluster", nodeId = nodeId,
- clusterMembers = members, nodeAddress = "localhost:$port", storagePath = "target/cluster"
+ clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
@@ -82,11 +93,11 @@ class AtomixBluePrintClusterServiceTest {
private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
"blueprint-runtime-$storeId"
).toDistributedMap()
@@ -105,13 +116,13 @@ class AtomixBluePrintClusterServiceTest {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(bluePrintClusterServices.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices[0], "first", lockName)
}
val deferred2 = async {
- executeLock(bluePrintClusterServices.get(0), "second", lockName)
+ executeLock(bluePrintClusterServices[0], "second", lockName)
}
val deferred3 = async {
- executeLock(bluePrintClusterServices.get(0), "third", lockName)
+ executeLock(bluePrintClusterServices[1], "third", lockName)
}
deferred.start()
deferred2.start()