summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/atomix-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt23
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt63
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml2
4 files changed, 67 insertions, 26 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
index 696d728dd..17d243620 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
@@ -16,10 +16,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+import com.fasterxml.jackson.databind.JsonNode
import io.atomix.core.map.DistributedMap
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-fun <T : Map<*, *>> T.toDistributedMap(): DistributedMap<*, *> {
- return if (this != null && this is DistributedMap<*, *>) this
+fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> {
+ return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode>
else throw BluePrintProcessorException("map is not of type DistributedMap")
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
index b5ec6dd5c..0690eb89d 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -55,10 +55,12 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
when (membershipEvent.type()) {
ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
else -> log.info("Member event unknown")
}
}
+ /** Start and Join the Cluster */
atomix.start().join()
log.info(
"Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -83,6 +85,19 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
return atomix.isRunning
}
+ override suspend fun masterMember(partitionGroup: String): ClusterMember {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ check(atomix.isRunning) { "cluster is not running" }
+ val masterId = atomix.partitionService
+ .getPartitionGroup(partitionGroup)
+ .getPartition("1").primary()
+ val masterMember = atomix.membershipService.getMember(masterId)
+ return ClusterMember(
+ id = masterMember.id().id(),
+ memberAddress = masterMember.address().toString()
+ )
+ }
+
override suspend fun allMembers(): Set<ClusterMember> {
check(::atomix.isInitialized) { "failed to start and join cluster" }
check(atomix.isRunning) { "cluster is not running" }
@@ -90,7 +105,7 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
return atomix.membershipService.members.map {
ClusterMember(
id = it.id().id(),
- memberAddress = it.host()
+ memberAddress = it.address().toString()
)
}.toSet()
}
@@ -153,4 +168,10 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String)
override fun isLocked(): Boolean {
return distributedLock.isLocked
}
+
+ override fun close() {
+ if (::distributedLock.isInitialized) {
+ distributedLock.close()
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index b25706972..39453fc7a 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -17,7 +17,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import com.fasterxml.jackson.databind.JsonNode
-import io.atomix.core.Atomix
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
@@ -27,7 +26,7 @@ import kotlinx.coroutines.withContext
import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
@@ -45,12 +44,25 @@ class AtomixBluePrintClusterServiceTest {
}
}
- /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/
+ /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
@Test
fun testClusterJoin() {
runBlocking {
- val members = arrayListOf("node-5679", "node-5680")
- val deferred = arrayListOf(5679, 5680).map { port ->
+ val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680))
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682))
+ val bluePrintClusterService = bluePrintClusterServiceOne.get(0)
+ log.info("Members : ${bluePrintClusterService.allMembers()}")
+ log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
+ log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
+ testDistributedStore(bluePrintClusterServiceOne)
+ testDistributedLock(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun createCluster(ports: List<Int>): List<BluePrintClusterService> {
+ return withContext(Dispatchers.Default) {
+ val members = ports.map { "node-$it" }
+ val deferred = ports.map { port ->
async(Dispatchers.IO) {
val nodeId = "node-$port"
log.info("********** Starting node($nodeId) on port($port)")
@@ -60,43 +72,46 @@ class AtomixBluePrintClusterServiceTest {
)
val atomixClusterService = AtomixBluePrintClusterService()
atomixClusterService.startCluster(clusterInfo)
- atomixClusterService.atomix
+ atomixClusterService
}
}
- val atomixs = deferred.awaitAll()
- testDistributedStore(atomixs)
- testDistributedLock(atomixs)
+ deferred.awaitAll()
}
}
- private suspend fun testDistributedStore(atomix: List<Atomix>) {
+ private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
/** Test Distributed store creation */
repeat(2) { storeId ->
- val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ val store = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ).toDistributedMap()
assertNotNull(store, "failed to get store")
- val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ val store1 = bluePrintClusterServices.get(0).clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ).toDistributedMap()
+
store1.addListener {
log.info("Received map event : $it")
}
- repeat(10) {
+ repeat(5) {
store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
}
- delay(100)
+ delay(10)
store.close()
}
}
- private suspend fun testDistributedLock(atomix: List<Atomix>) {
+ private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
val lockName = "sample-lock"
withContext(Dispatchers.IO) {
val deferred = async {
- executeLock(atomix.get(0), "first", lockName)
+ executeLock(bluePrintClusterServices.get(0), "first", lockName)
}
val deferred2 = async {
- executeLock(atomix.get(1), "second", lockName)
+ executeLock(bluePrintClusterServices.get(0), "second", lockName)
}
val deferred3 = async {
- executeLock(atomix.get(1), "third", lockName)
+ executeLock(bluePrintClusterServices.get(0), "third", lockName)
}
deferred.start()
deferred2.start()
@@ -104,17 +119,21 @@ class AtomixBluePrintClusterServiceTest {
}
}
- private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+ private suspend fun executeLock(
+ bluePrintClusterService: BluePrintClusterService,
+ lockId: String,
+ lockName: String
+ ) {
log.info("initialising $lockId lock...")
- val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+ val distributedLock = bluePrintClusterService.clusterLock(lockName)
assertNotNull(distributedLock, "failed to create distributed $lockId lock")
distributedLock.lock()
- assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+ assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
try {
log.info("locked $lockId process for 5mSec")
delay(5)
} finally {
- distributedLock.unlock()
+ distributedLock.unLock()
log.info("$lockId lock released")
}
distributedLock.close()
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
index f1c625e8f..016d48636 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n</pattern>
</encoder>
</appender>