diff options
10 files changed, 217 insertions, 70 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml index f43f19c52..a37089f10 100644 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml @@ -20,20 +20,20 @@ services: image: nats-streaming:latest container_name: nats hostname: nats - command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-1 --cluster_node_id nats-1" + command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0" networks: - cds-network ports: - "8222:8222" - "4222:4222" restart: always - cds-controller-1: + cds-controller-0: depends_on: - db - nats image: onap/ccsdk-blueprintsprocessor:latest - container_name: cds-controller-1 - hostname: cds-controller-1 + container_name: cds-controller-0 + hostname: cds-controller-0 networks: - cds-network ports: @@ -49,9 +49,10 @@ services: source: ./config environment: # Same as hostname and container name + CLUSTER_ENABLED: "true" CLUSTER_ID: cds-cluster - CLUSTER_NODE_ID: cds-controller-1 - CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1 + CLUSTER_NODE_ID: cds-controller-0 + CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0 CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf NATS_HOSTS: nats://nats:4222 @@ -60,13 +61,13 @@ services: APP_CONFIG_HOME: /opt/app/onap/config STICKYSELECTORKEY: ENVCONTEXT: dev - resource-resolution-1: + resource-resolution-0: depends_on: - db - nats image: onap/ccsdk-blueprintsprocessor:latest - container_name: resource-resolution-1 - hostname: resource-resolution-1 + container_name: resource-resolution-0 + hostname: resource-resolution-0 networks: - cds-network ports: @@ -81,9 +82,10 @@ services: type: bind source: ./config environment: + CLUSTER_ENABLED: "true" CLUSTER_ID: cds-cluster - CLUSTER_NODE_ID: resource-resolution-1 - CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1 + CLUSTER_NODE_ID: resource-resolution-0 + CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0 CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf NATS_HOSTS: nats://nats:4222 diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh index f5967dcb4..f516a3c57 100644 --- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh +++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh @@ -18,10 +18,4 @@ exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sour -Djava.security.egd=file:/dev/./urandom \ -DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \ -Dspring.config.location=${APP_CONFIG_HOME}/ \ --DCLUSTER_ID=${CLUSTER_ID} \ --DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \ --DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \ --DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \ --DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \ --DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \ org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt index b78ebf68b..4c9314ec2 100644 --- a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt @@ -22,6 +22,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Component @@ -52,7 +54,7 @@ import javax.annotation.PreDestroy * CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster * CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf * 4. Cluster will be enabled only all the above properties present in the environments. - * if CLUSTER_ID is present, then it will try to create cluster. + * if CLUSTER_ENABLED is present, then it will try to create cluster. */ @Component open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) { @@ -61,25 +63,22 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr @EventListener(ApplicationReadyEvent::class) fun startAndJoinCluster() = runBlocking { - val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) - if (!clusterId.isNullOrEmpty()) { + if (BluePrintConstants.CLUSTER_ENABLED) { - val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) - ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}") + val clusterId = ClusterUtils.clusterId() + val nodeId = ClusterUtils.clusterNodeId() + val nodeAddress = ClusterUtils.clusterNodeAddress() - val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS) - ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}") - - val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS) + val clusterMembers = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS) ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}") - val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList() + val clusterMemberList = clusterMembers.splitCommaAsList() - val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH) + val clusterStorage = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH) ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}") - val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE) + val clusterConfigFile = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE) val clusterInfo = ClusterInfo( id = clusterId, nodeId = nodeId, @@ -89,10 +88,7 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr ) bluePrintClusterService.startCluster(clusterInfo) } else { - log.info( - "Cluster is disabled, to enable cluster set the environment " + - "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]" - ) + log.info("Cluster is disabled, to enable cluster set the environment CLUSTER_* properties.") } } diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index caf063161..571f0a176 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -27,7 +27,7 @@ object BluePrintConstants { val APP_NAME = System.getProperty("APPLICATION_NAME") ?: System.getProperty("APP_NAME") ?: System.getProperty("APPNAME") - ?: "cds-controller-default" + ?: "cds-controller" const val DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" @@ -219,6 +219,7 @@ object BluePrintConstants { val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean() /** Cluster Properties */ + val CLUSTER_ENABLED = (System.getenv("CLUSTER_ENABLED") ?: "false").toBoolean() const val PROPERTY_CLUSTER_ID = "CLUSTER_ID" const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID" const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS" diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt index d5ffe6b7f..b52cd711b 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt @@ -28,10 +28,15 @@ object ClusterUtils { } fun clusterId(): String { - return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster" + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster" } fun clusterNodeId(): String { - return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller" + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller-0" + } + + fun clusterNodeAddress(): String { + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS) + ?: clusterNodeId() } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 27921be9d..b5ec6dd5c 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service import io.atomix.cluster.ClusterMembershipEvent import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import kotlinx.coroutines.delay import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.stereotype.Service @@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { lateinit var atomix: Atomix - private var joined = false - override suspend fun startCluster(clusterInfo: ClusterInfo) { log.info( "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + @@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { /** Listen for the member chaneg events */ atomix.membershipService.addListener { membershipEvent -> when (membershipEvent.type()) { - ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") - ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") - ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") - else -> log.info("***** Member event unknown") + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") + else -> log.info("Member event unknown") } } atomix.start().join() @@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { "ping", "ping from node(${clusterInfo.nodeId})" ) - joined = true } override fun clusterJoined(): Boolean { - return joined + return atomix.isRunning } override suspend fun allMembers(): Set<ClusterMember> { check(::atomix.isInitialized) { "failed to start and join cluster" } check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { ClusterMember( id = it.id().id(), @@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + check(::atomix.isInitialized) { "failed to start and join cluster" } return AtomixLibUtils.distributedMapStore<T>(atomix, name) } + /** The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. * + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::atomix.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(atomix, name) + } + override suspend fun shutDown(duration: Duration) { - val shutDownMilli = duration.toMillis() - log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") - delay(shutDownMilli) - atomix.stop() + if (::atomix.isInitialized) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } + } +} + +open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + + lateinit var distributedLock: DistributedLock + + override suspend fun lock() { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + distributedLock.lock() + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + return distributedLock.tryLock(Duration.ofMillis(timeout)) + } + + override suspend fun unLock() { + distributedLock.unlock() + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index 6e726a1a6..a2a0d3902 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup @@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup import io.atomix.utils.net.Address import org.jsoup.nodes.TextNode import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile object AtomixLibUtils { + private val log = logger(AtomixLibUtils::class) fun configAtomix(filePath: String): Atomix { val configFile = normalizedFile(filePath) return Atomix.builder(configFile.absolutePath).build() } - fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + fun defaultMulticastAtomix( + clusterInfo: ClusterInfo, + raftPartitions: Int = 1, + primaryBackupPartitions: Int = 32 + ): Atomix { val nodeId = clusterInfo.nodeId val raftPartitionGroup = RaftPartitionGroup.builder("system") - .withNumPartitions(7) + .withNumPartitions(raftPartitions) .withMembers(clusterInfo.clusterMembers) .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) .build() val primaryBackupGroup = PrimaryBackupPartitionGroup.builder("data") - .withNumPartitions(31) + .withNumPartitions(primaryBackupPartitions) .build() return Atomix.builder() @@ -62,11 +69,11 @@ object AtomixLibUtils { .build() } - fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> { + fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> { check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } val protocol = MultiPrimaryProtocol.builder() - .withBackups(2) + .withBackups(numBackups) .build() return atomix.mapBuilder<String, T>(storeName) @@ -79,4 +86,17 @@ object AtomixLibUtils { ) .build() } + + fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() + + val lock = atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + return lock + } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 919d6712b..b25706972 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -17,11 +17,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix import com.fasterxml.jackson.databind.JsonNode +import io.atomix.core.Atomix import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService @@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir import org.onap.ccsdk.cds.controllerblueprints.core.logger import kotlin.test.assertNotNull +import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { val log = logger(AtomixBluePrintClusterServiceTest::class) @@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest { atomixClusterService.atomix } } - val atomix = deferred.awaitAll() - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") - assertNotNull(store, "failed to get store") - val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") - store1.addListener { - log.info("Received map event : $it") - } - repeat(10) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - delay(100) - store.close() + val atomixs = deferred.awaitAll() + testDistributedStore(atomixs) + testDistributedLock(atomixs) + } + } + + private suspend fun testDistributedStore(atomix: List<Atomix>) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() } + delay(100) + store.close() + } + } + + private suspend fun testDistributedLock(atomix: List<Atomix>) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + executeLock(atomix.get(0), "first", lockName) + } + val deferred2 = async { + executeLock(atomix.get(1), "second", lockName) + } + val deferred3 = async { + executeLock(atomix.get(1), "third", lockName) + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) { + log.info("initialising $lockId lock...") + val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked, "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unlock() + log.info("$lockId lock released") } + distributedLock.close() } } diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt index 976f9f5e8..549be6481 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import kotlin.test.assertNotNull @@ -106,6 +107,7 @@ class BluePrintNatsServiceTest { testMultiPublish(natsService) testLoadBalance(natsService) + testLimitSubscription(natsService) testRequestReply(natsService) testMultiRequestReply(natsService) delay(1000) @@ -137,12 +139,49 @@ class BluePrintNatsServiceTest { val lbMessageHandler2 = MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) repeat(5) { natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) } + sub1.unsubscribe() + sub2.unsubscribe() + } + } + + private fun testLimitSubscription(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 1: ${message.strData()}") + message.ack() + } + } + val lbMessageHandler2 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 2: ${message.strData()}") + message.ack() + } + } + + val sub1 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler1, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + val sub2 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler2, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + + repeat(10) { + natsService.publish("lb-publish", "lb limit message-$it".toByteArray()) + } + sub1.unsubscribe() + sub2.unsubscribe() } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index bbaa427c9..6fd443624 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -35,6 +35,9 @@ interface BluePrintClusterService { /** Create and get or get the distributed data map store with [name] */ suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> + /** Create and get the distributed lock with [name] */ + suspend fun clusterLock(name: String): ClusterLock + /** Shut down the cluster with [duration] */ suspend fun shutDown(duration: Duration) } @@ -48,4 +51,11 @@ data class ClusterInfo( var storagePath: String ) -data class ClusterMember(val id: String, val memberAddress: String?) +data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) + +interface ClusterLock { + suspend fun lock() + suspend fun tryLock(timeout: Long): Boolean + suspend fun unLock() + fun isLocked(): Boolean +} |