From 730c940a84b9056fed993ccef08dc5ec4053db21 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 26 Dec 2019 15:15:27 -0500 Subject: Cluster partition master API Added cluster lock close method. Added NATS connection name and default services. Junit test cases improvements. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: I44c7c1aaeae2ddbf768903152fb00bc160172fc1 --- .../atomix/BluePrintAtomixLibExtensions.kt | 5 +- .../service/AtomixBluePrintClusterService.kt | 23 +++++++- .../atomix/AtomixBluePrintClusterServiceTest.kt | 63 ++++++++++++++-------- .../atomix-lib/src/test/resources/logback-test.xml | 2 +- .../nats/BluePrintNatsLibConfiguration.kt | 6 +++ .../nats/BluePrintNatsLibData.kt | 2 + .../nats/service/TLSAuthNatsService.kt | 1 + .../nats/service/TokenAuthNatsService.kt | 1 + .../core/service/BluePrintClusterService.kt | 4 ++ 9 files changed, 81 insertions(+), 26 deletions(-) diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt index 696d728dd..17d243620 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt @@ -16,10 +16,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix +import com.fasterxml.jackson.databind.JsonNode import io.atomix.core.map.DistributedMap import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -fun > T.toDistributedMap(): DistributedMap<*, *> { - return if (this != null && this is DistributedMap<*, *>) this +fun > T.toDistributedMap(): DistributedMap { + return if (this != null && this is DistributedMap<*, *>) this as DistributedMap else throw BluePrintProcessorException("map is not of type DistributedMap") } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index b5ec6dd5c..0690eb89d 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -55,10 +55,12 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { when (membershipEvent.type()) { ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}") ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") else -> log.info("Member event unknown") } } + /** Start and Join the Cluster */ atomix.start().join() log.info( "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + @@ -83,6 +85,19 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { return atomix.isRunning } + override suspend fun masterMember(partitionGroup: String): ClusterMember { + check(::atomix.isInitialized) { "failed to start and join cluster" } + check(atomix.isRunning) { "cluster is not running" } + val masterId = atomix.partitionService + .getPartitionGroup(partitionGroup) + .getPartition("1").primary() + val masterMember = atomix.membershipService.getMember(masterId) + return ClusterMember( + id = masterMember.id().id(), + memberAddress = masterMember.address().toString() + ) + } + override suspend fun allMembers(): Set { check(::atomix.isInitialized) { "failed to start and join cluster" } check(atomix.isRunning) { "cluster is not running" } @@ -90,7 +105,7 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { return atomix.membershipService.members.map { ClusterMember( id = it.id().id(), - memberAddress = it.host() + memberAddress = it.address().toString() ) }.toSet() } @@ -153,4 +168,10 @@ open class ClusterLockImpl(private val atomix: Atomix, private val name: String) override fun isLocked(): Boolean { return distributedLock.isLocked } + + override fun close() { + if (::distributedLock.isInitialized) { + distributedLock.close() + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index b25706972..39453fc7a 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -17,7 +17,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix import com.fasterxml.jackson.databind.JsonNode -import io.atomix.core.Atomix import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -27,7 +26,7 @@ import kotlinx.coroutines.withContext import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir @@ -45,12 +44,25 @@ class AtomixBluePrintClusterServiceTest { } } - /** Testing two cluster with distributed map store creation, This is time consuming test casetake around 10s **/ + /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/ @Test fun testClusterJoin() { runBlocking { - val members = arrayListOf("node-5679", "node-5680") - val deferred = arrayListOf(5679, 5680).map { port -> + val bluePrintClusterServiceOne = createCluster(arrayListOf(5679, 5680)) + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682)) + val bluePrintClusterService = bluePrintClusterServiceOne.get(0) + log.info("Members : ${bluePrintClusterService.allMembers()}") + log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") + log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") + testDistributedStore(bluePrintClusterServiceOne) + testDistributedLock(bluePrintClusterServiceOne) + } + } + + private suspend fun createCluster(ports: List): List { + return withContext(Dispatchers.Default) { + val members = ports.map { "node-$it" } + val deferred = ports.map { port -> async(Dispatchers.IO) { val nodeId = "node-$port" log.info("********** Starting node($nodeId) on port($port)") @@ -60,43 +72,46 @@ class AtomixBluePrintClusterServiceTest { ) val atomixClusterService = AtomixBluePrintClusterService() atomixClusterService.startCluster(clusterInfo) - atomixClusterService.atomix + atomixClusterService } } - val atomixs = deferred.awaitAll() - testDistributedStore(atomixs) - testDistributedLock(atomixs) + deferred.awaitAll() } } - private suspend fun testDistributedStore(atomix: List) { + private suspend fun testDistributedStore(bluePrintClusterServices: List) { /** Test Distributed store creation */ repeat(2) { storeId -> - val store = AtomixLibUtils.distributedMapStore(atomix.get(0), "blueprint-runtime-$storeId") + val store = bluePrintClusterServices.get(0).clusterMapStore( + "blueprint-runtime-$storeId" + ).toDistributedMap() assertNotNull(store, "failed to get store") - val store1 = AtomixLibUtils.distributedMapStore(atomix.get(1), "blueprint-runtime-$storeId") + val store1 = bluePrintClusterServices.get(0).clusterMapStore( + "blueprint-runtime-$storeId" + ).toDistributedMap() + store1.addListener { log.info("Received map event : $it") } - repeat(10) { + repeat(5) { store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() } - delay(100) + delay(10) store.close() } } - private suspend fun testDistributedLock(atomix: List) { + private suspend fun testDistributedLock(bluePrintClusterServices: List) { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(atomix.get(0), "first", lockName) + executeLock(bluePrintClusterServices.get(0), "first", lockName) } val deferred2 = async { - executeLock(atomix.get(1), "second", lockName) + executeLock(bluePrintClusterServices.get(0), "second", lockName) } val deferred3 = async { - executeLock(atomix.get(1), "third", lockName) + executeLock(bluePrintClusterServices.get(0), "third", lockName) } deferred.start() deferred2.start() @@ -104,17 +119,21 @@ class AtomixBluePrintClusterServiceTest { } } - private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) { + private suspend fun executeLock( + bluePrintClusterService: BluePrintClusterService, + lockId: String, + lockName: String + ) { log.info("initialising $lockId lock...") - val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName) + val distributedLock = bluePrintClusterService.clusterLock(lockName) assertNotNull(distributedLock, "failed to create distributed $lockId lock") distributedLock.lock() - assertTrue(distributedLock.isLocked, "failed to lock $lockId") + assertTrue(distributedLock.isLocked(), "failed to lock $lockId") try { log.info("locked $lockId process for 5mSec") delay(5) } finally { - distributedLock.unlock() + distributedLock.unLock() log.info("$lockId lock released") } distributedLock.close() diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml index f1c625e8f..016d48636 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ - %d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt index 709ee7d6e..147d360ba 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.nats import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration @@ -31,9 +32,14 @@ open class BluePrintNatsLibConfiguration fun BluePrintDependencyService.natsLibPropertyService(): BluePrintNatsLibPropertyService = instance(NatsLibConstants.SERVICE_BLUEPRINT_NATS_LIB_PROPERTY) +fun BluePrintDependencyService.controllerNatsService(): BluePrintNatsService { + return natsLibPropertyService().bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR) +} + class NatsLibConstants { companion object { const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service" + const val DEFULT_NATS_SELECTOR = "cds-controller" const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats." const val TYPE_TOKEN_AUTH = "token-auth" const val TYPE_TLS_AUTH = "tls-auth" diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt index 3329ec200..9767ac29d 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt @@ -23,6 +23,8 @@ open class NatsConnectionProperties { var clusterId: String = ClusterUtils.clusterId() var clientId: String = ClusterUtils.clusterNodeId() lateinit var host: String + /** Rest endpoint selector to access Monitoring API */ + var monitoringSelector: String? = null } open class TokenAuthNatsConnectionProperties : NatsConnectionProperties() { diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt index 3781fae59..00a972eff 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TLSAuthNatsService.kt @@ -34,6 +34,7 @@ open class TLSAuthNatsService(private val natsConnectionProperties: TLSAuthNatsC val serverList = natsConnectionProperties.host.splitCommaAsList() val options = Options.Builder() + .connectionName(natsConnectionProperties.clientId) .servers(serverList.toTypedArray()) // .sslContext(sslContext()) .build() diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt index 0da3022ff..60b7934ba 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt @@ -33,6 +33,7 @@ open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthN val serverList = natsConnectionProperties.host.splitCommaAsList() val options = Options.Builder() + .connectionName(natsConnectionProperties.clientId) .servers(serverList.toTypedArray()) .token(natsConnectionProperties.token.toCharArray()) .build() diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 6fd443624..21fcfc509 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -26,6 +26,9 @@ interface BluePrintClusterService { fun clusterJoined(): Boolean + /** Returns [partitionGroup] master member */ + suspend fun masterMember(partitionGroup: String): ClusterMember + /** Returns all the data cluster members */ suspend fun allMembers(): Set @@ -58,4 +61,5 @@ interface ClusterLock { suspend fun tryLock(timeout: Long): Boolean suspend fun unLock() fun isLocked(): Boolean + fun close() } -- cgit 1.2.3-korg