summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml24
-rw-r--r--ms/blueprintsprocessor/application/src/main/docker/startService.sh6
-rw-r--r--ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt28
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt3
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt62
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt30
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt70
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt43
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt12
10 files changed, 217 insertions, 70 deletions
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
index f43f19c52..a37089f10 100644
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
@@ -20,20 +20,20 @@ services:
image: nats-streaming:latest
container_name: nats
hostname: nats
- command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-1 --cluster_node_id nats-1"
+ command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0"
networks:
- cds-network
ports:
- "8222:8222"
- "4222:4222"
restart: always
- cds-controller-1:
+ cds-controller-0:
depends_on:
- db
- nats
image: onap/ccsdk-blueprintsprocessor:latest
- container_name: cds-controller-1
- hostname: cds-controller-1
+ container_name: cds-controller-0
+ hostname: cds-controller-0
networks:
- cds-network
ports:
@@ -49,9 +49,10 @@ services:
source: ./config
environment:
# Same as hostname and container name
+ CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
- CLUSTER_NODE_ID: cds-controller-1
- CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_NODE_ID: cds-controller-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
NATS_HOSTS: nats://nats:4222
@@ -60,13 +61,13 @@ services:
APP_CONFIG_HOME: /opt/app/onap/config
STICKYSELECTORKEY:
ENVCONTEXT: dev
- resource-resolution-1:
+ resource-resolution-0:
depends_on:
- db
- nats
image: onap/ccsdk-blueprintsprocessor:latest
- container_name: resource-resolution-1
- hostname: resource-resolution-1
+ container_name: resource-resolution-0
+ hostname: resource-resolution-0
networks:
- cds-network
ports:
@@ -81,9 +82,10 @@ services:
type: bind
source: ./config
environment:
+ CLUSTER_ENABLED: "true"
CLUSTER_ID: cds-cluster
- CLUSTER_NODE_ID: resource-resolution-1
- CLUSTER_MEMBERS: cds-controller-1,resource-resolution-1
+ CLUSTER_NODE_ID: resource-resolution-0
+ CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
#CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
NATS_HOSTS: nats://nats:4222
diff --git a/ms/blueprintsprocessor/application/src/main/docker/startService.sh b/ms/blueprintsprocessor/application/src/main/docker/startService.sh
index f5967dcb4..f516a3c57 100644
--- a/ms/blueprintsprocessor/application/src/main/docker/startService.sh
+++ b/ms/blueprintsprocessor/application/src/main/docker/startService.sh
@@ -18,10 +18,4 @@ exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sour
-Djava.security.egd=file:/dev/./urandom \
-DAPPNAME=${APPLICATIONNAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
-Dspring.config.location=${APP_CONFIG_HOME}/ \
--DCLUSTER_ID=${CLUSTER_ID} \
--DCLUSTER_NODE_ID=${CLUSTER_NODE_ID} \
--DCLUSTER_NODE_ADDRESS=${CLUSTER_NODE_ID} \
--DCLUSTER_MEMBERS=${CLUSTER_MEMBERS} \
--DCLUSTER_STORAGE_PATH=${CLUSTER_STORAGE_PATH} \
--DCLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} \
org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
index b78ebf68b..4c9314ec2 100644
--- a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
+++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/BluePrintProcessorCluster.kt
@@ -22,6 +22,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
@@ -52,7 +54,7 @@ import javax.annotation.PreDestroy
* CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
* CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
* 4. Cluster will be enabled only all the above properties present in the environments.
- * if CLUSTER_ID is present, then it will try to create cluster.
+ * if CLUSTER_ENABLED is present, then it will try to create cluster.
*/
@Component
open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePrintClusterService) {
@@ -61,25 +63,22 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr
@EventListener(ApplicationReadyEvent::class)
fun startAndJoinCluster() = runBlocking {
- val clusterId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID)
- if (!clusterId.isNullOrEmpty()) {
+ if (BluePrintConstants.CLUSTER_ENABLED) {
- val nodeId = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
- ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ID}")
+ val clusterId = ClusterUtils.clusterId()
+ val nodeId = ClusterUtils.clusterNodeId()
+ val nodeAddress = ClusterUtils.clusterNodeAddress()
- val nodeAddress = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
- ?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS}")
-
- val clusterMembers = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
+ val clusterMembers = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_MEMBERS)
?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_MEMBERS}")
- val clusterMemberList = clusterMembers.split(",").map { it.trim() }.toList()
+ val clusterMemberList = clusterMembers.splitCommaAsList()
- val clusterStorage = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
+ val clusterStorage = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH)
?: throw BluePrintProcessorException("couldn't get environment variable ${BluePrintConstants.PROPERTY_CLUSTER_STORAGE_PATH}")
- val clusterConfigFile = System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
+ val clusterConfigFile = System.getenv(BluePrintConstants.PROPERTY_CLUSTER_CONFIG_FILE)
val clusterInfo = ClusterInfo(
id = clusterId, nodeId = nodeId,
@@ -89,10 +88,7 @@ open class BluePrintProcessorCluster(private val bluePrintClusterService: BluePr
)
bluePrintClusterService.startCluster(clusterInfo)
} else {
- log.info(
- "Cluster is disabled, to enable cluster set the environment " +
- "properties[CLUSTER_ID,CLUSTER_NODE_ID, CLUSTER_NODE_ADDRESS, CLUSTER_MEMBERS,CLUSTER_CONFIG_FILE]"
- )
+ log.info("Cluster is disabled, to enable cluster set the environment CLUSTER_* properties.")
}
}
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index caf063161..571f0a176 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -27,7 +27,7 @@ object BluePrintConstants {
val APP_NAME = System.getProperty("APPLICATION_NAME")
?: System.getProperty("APP_NAME")
?: System.getProperty("APPNAME")
- ?: "cds-controller-default"
+ ?: "cds-controller"
const val DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
@@ -219,6 +219,7 @@ object BluePrintConstants {
val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean()
/** Cluster Properties */
+ val CLUSTER_ENABLED = (System.getenv("CLUSTER_ENABLED") ?: "false").toBoolean()
const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
index d5ffe6b7f..b52cd711b 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
@@ -28,10 +28,15 @@ object ClusterUtils {
}
fun clusterId(): String {
- return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
}
fun clusterNodeId(): String {
- return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller"
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller-0"
+ }
+
+ fun clusterNodeAddress(): String {
+ return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS)
+ ?: clusterNodeId()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
index 27921be9d..b5ec6dd5c 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
import io.atomix.cluster.ClusterMembershipEvent
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import kotlinx.coroutines.delay
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.stereotype.Service
@@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
lateinit var atomix: Atomix
- private var joined = false
-
override suspend fun startCluster(clusterInfo: ClusterInfo) {
log.info(
"Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
/** Listen for the member chaneg events */
atomix.membershipService.addListener { membershipEvent ->
when (membershipEvent.type()) {
- ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
- ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
- ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
- else -> log.info("***** Member event unknown")
+ ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
+ else -> log.info("Member event unknown")
}
}
atomix.start().join()
@@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
"ping",
"ping from node(${clusterInfo.nodeId})"
)
- joined = true
}
override fun clusterJoined(): Boolean {
- return joined
+ return atomix.isRunning
}
override suspend fun allMembers(): Set<ClusterMember> {
check(::atomix.isInitialized) { "failed to start and join cluster" }
check(atomix.isRunning) { "cluster is not running" }
+
return atomix.membershipService.members.map {
ClusterMember(
id = it.id().id(),
@@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
}
override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
return AtomixLibUtils.distributedMapStore<T>(atomix, name)
}
+ /** The DistributedLock is a distributed implementation of Java’s Lock.
+ * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+ * determine ordering of multiple concurrent lock holders.
+ * DistributedLocks are designed to account for failures within the cluster.
+ * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+ * the lock will be released and granted to the next waiting process. *
+ */
+ override suspend fun clusterLock(name: String): ClusterLock {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ return ClusterLockImpl(atomix, name)
+ }
+
override suspend fun shutDown(duration: Duration) {
- val shutDownMilli = duration.toMillis()
- log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
- delay(shutDownMilli)
- atomix.stop()
+ if (::atomix.isInitialized) {
+ val shutDownMilli = duration.toMillis()
+ log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+ delay(shutDownMilli)
+ atomix.stop()
+ }
+ }
+}
+
+open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+
+ lateinit var distributedLock: DistributedLock
+
+ override suspend fun lock() {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ distributedLock.lock()
+ }
+
+ override suspend fun tryLock(timeout: Long): Boolean {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ return distributedLock.tryLock(Duration.ofMillis(timeout))
+ }
+
+ override suspend fun unLock() {
+ distributedLock.unlock()
+ }
+
+ override fun isLocked(): Boolean {
+ return distributedLock.isLocked
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index 6e726a1a6..a2a0d3902 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
@@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup
import io.atomix.utils.net.Address
import org.jsoup.nodes.TextNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
object AtomixLibUtils {
+ private val log = logger(AtomixLibUtils::class)
fun configAtomix(filePath: String): Atomix {
val configFile = normalizedFile(filePath)
return Atomix.builder(configFile.absolutePath).build()
}
- fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+ fun defaultMulticastAtomix(
+ clusterInfo: ClusterInfo,
+ raftPartitions: Int = 1,
+ primaryBackupPartitions: Int = 32
+ ): Atomix {
val nodeId = clusterInfo.nodeId
val raftPartitionGroup = RaftPartitionGroup.builder("system")
- .withNumPartitions(7)
+ .withNumPartitions(raftPartitions)
.withMembers(clusterInfo.clusterMembers)
.withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
.build()
val primaryBackupGroup =
PrimaryBackupPartitionGroup.builder("data")
- .withNumPartitions(31)
+ .withNumPartitions(primaryBackupPartitions)
.build()
return Atomix.builder()
@@ -62,11 +69,11 @@ object AtomixLibUtils {
.build()
}
- fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+ fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
val protocol = MultiPrimaryProtocol.builder()
- .withBackups(2)
+ .withBackups(numBackups)
.build()
return atomix.mapBuilder<String, T>(storeName)
@@ -79,4 +86,17 @@ object AtomixLibUtils {
)
.build()
}
+
+ fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
+
+ val lock = atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ return lock
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 919d6712b..b25706972 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -17,11 +17,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import com.fasterxml.jackson.databind.JsonNode
+import io.atomix.core.Atomix
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
@@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
val log = logger(AtomixBluePrintClusterServiceTest::class)
@@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest {
atomixClusterService.atomix
}
}
- val atomix = deferred.awaitAll()
- /** Test Distributed store creation */
- repeat(2) { storeId ->
- val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
- assertNotNull(store, "failed to get store")
- val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
- store1.addListener {
- log.info("Received map event : $it")
- }
- repeat(10) {
- store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
- }
- delay(100)
- store.close()
+ val atomixs = deferred.awaitAll()
+ testDistributedStore(atomixs)
+ testDistributedLock(atomixs)
+ }
+ }
+
+ private suspend fun testDistributedStore(atomix: List<Atomix>) {
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ assertNotNull(store, "failed to get store")
+ val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ store1.addListener {
+ log.info("Received map event : $it")
+ }
+ repeat(10) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
}
+ delay(100)
+ store.close()
+ }
+ }
+
+ private suspend fun testDistributedLock(atomix: List<Atomix>) {
+ val lockName = "sample-lock"
+ withContext(Dispatchers.IO) {
+ val deferred = async {
+ executeLock(atomix.get(0), "first", lockName)
+ }
+ val deferred2 = async {
+ executeLock(atomix.get(1), "second", lockName)
+ }
+ val deferred3 = async {
+ executeLock(atomix.get(1), "third", lockName)
+ }
+ deferred.start()
+ deferred2.start()
+ deferred3.start()
+ }
+ }
+
+ private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+ log.info("initialising $lockId lock...")
+ val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+ assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+ distributedLock.lock()
+ assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+ try {
+ log.info("locked $lockId process for 5mSec")
+ delay(5)
+ } finally {
+ distributedLock.unlock()
+ log.info("$lockId lock released")
}
+ distributedLock.close()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
index 976f9f5e8..549be6481 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import kotlin.test.assertNotNull
@@ -106,6 +107,7 @@ class BluePrintNatsServiceTest {
testMultiPublish(natsService)
testLoadBalance(natsService)
+ testLimitSubscription(natsService)
testRequestReply(natsService)
testMultiRequestReply(natsService)
delay(1000)
@@ -137,12 +139,49 @@ class BluePrintNatsServiceTest {
val lbMessageHandler2 =
MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+ val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
repeat(5) {
natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
}
+ sub1.unsubscribe()
+ sub2.unsubscribe()
+ }
+ }
+
+ private fun testLimitSubscription(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 1: ${message.strData()}")
+ message.ack()
+ }
+ }
+ val lbMessageHandler2 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 2: ${message.strData()}")
+ message.ack()
+ }
+ }
+
+ val sub1 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler1,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+ val sub2 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler2,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+
+ repeat(10) {
+ natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+ }
+ sub1.unsubscribe()
+ sub2.unsubscribe()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index bbaa427c9..6fd443624 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -35,6 +35,9 @@ interface BluePrintClusterService {
/** Create and get or get the distributed data map store with [name] */
suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
+ /** Create and get the distributed lock with [name] */
+ suspend fun clusterLock(name: String): ClusterLock
+
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
}
@@ -48,4 +51,11 @@ data class ClusterInfo(
var storagePath: String
)
-data class ClusterMember(val id: String, val memberAddress: String?)
+data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+
+interface ClusterLock {
+ suspend fun lock()
+ suspend fun tryLock(timeout: Long): Boolean
+ suspend fun unLock()
+ fun isLocked(): Boolean
+}