aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt62
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt30
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt70
-rw-r--r--ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt43
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt12
5 files changed, 183 insertions, 34 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
index 27921be9d..b5ec6dd5c 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
@@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
import io.atomix.cluster.ClusterMembershipEvent
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import kotlinx.coroutines.delay
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.stereotype.Service
@@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
lateinit var atomix: Atomix
- private var joined = false
-
override suspend fun startCluster(clusterInfo: ClusterInfo) {
log.info(
"Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
@@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
/** Listen for the member chaneg events */
atomix.membershipService.addListener { membershipEvent ->
when (membershipEvent.type()) {
- ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added")
- ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed")
- ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed")
- else -> log.info("***** Member event unknown")
+ ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
+ ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
+ else -> log.info("Member event unknown")
}
}
atomix.start().join()
@@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
"ping",
"ping from node(${clusterInfo.nodeId})"
)
- joined = true
}
override fun clusterJoined(): Boolean {
- return joined
+ return atomix.isRunning
}
override suspend fun allMembers(): Set<ClusterMember> {
check(::atomix.isInitialized) { "failed to start and join cluster" }
check(atomix.isRunning) { "cluster is not running" }
+
return atomix.membershipService.members.map {
ClusterMember(
id = it.id().id(),
@@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService {
}
override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
return AtomixLibUtils.distributedMapStore<T>(atomix, name)
}
+ /** The DistributedLock is a distributed implementation of Java’s Lock.
+ * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+ * determine ordering of multiple concurrent lock holders.
+ * DistributedLocks are designed to account for failures within the cluster.
+ * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+ * the lock will be released and granted to the next waiting process. *
+ */
+ override suspend fun clusterLock(name: String): ClusterLock {
+ check(::atomix.isInitialized) { "failed to start and join cluster" }
+ return ClusterLockImpl(atomix, name)
+ }
+
override suspend fun shutDown(duration: Duration) {
- val shutDownMilli = duration.toMillis()
- log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
- delay(shutDownMilli)
- atomix.stop()
+ if (::atomix.isInitialized) {
+ val shutDownMilli = duration.toMillis()
+ log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
+ delay(shutDownMilli)
+ atomix.stop()
+ }
+ }
+}
+
+open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
+
+ lateinit var distributedLock: DistributedLock
+
+ override suspend fun lock() {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ distributedLock.lock()
+ }
+
+ override suspend fun tryLock(timeout: Long): Boolean {
+ distributedLock = AtomixLibUtils.distributedLock(atomix, name)
+ return distributedLock.tryLock(Duration.ofMillis(timeout))
+ }
+
+ override suspend fun unLock() {
+ distributedLock.unlock()
+ }
+
+ override fun isLocked(): Boolean {
+ return distributedLock.isLocked
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
index 6e726a1a6..a2a0d3902 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.atomix.core.Atomix
+import io.atomix.core.lock.DistributedLock
import io.atomix.core.map.DistributedMap
import io.atomix.protocols.backup.MultiPrimaryProtocol
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
@@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup
import io.atomix.utils.net.Address
import org.jsoup.nodes.TextNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
object AtomixLibUtils {
+ private val log = logger(AtomixLibUtils::class)
fun configAtomix(filePath: String): Atomix {
val configFile = normalizedFile(filePath)
return Atomix.builder(configFile.absolutePath).build()
}
- fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix {
+ fun defaultMulticastAtomix(
+ clusterInfo: ClusterInfo,
+ raftPartitions: Int = 1,
+ primaryBackupPartitions: Int = 32
+ ): Atomix {
val nodeId = clusterInfo.nodeId
val raftPartitionGroup = RaftPartitionGroup.builder("system")
- .withNumPartitions(7)
+ .withNumPartitions(raftPartitions)
.withMembers(clusterInfo.clusterMembers)
.withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
.build()
val primaryBackupGroup =
PrimaryBackupPartitionGroup.builder("data")
- .withNumPartitions(31)
+ .withNumPartitions(primaryBackupPartitions)
.build()
return Atomix.builder()
@@ -62,11 +69,11 @@ object AtomixLibUtils {
.build()
}
- fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> {
+ fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
val protocol = MultiPrimaryProtocol.builder()
- .withBackups(2)
+ .withBackups(numBackups)
.build()
return atomix.mapBuilder<String, T>(storeName)
@@ -79,4 +86,17 @@ object AtomixLibUtils {
)
.build()
}
+
+ fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
+ check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
+
+ val protocol = MultiPrimaryProtocol.builder()
+ .withBackups(numBackups)
+ .build()
+
+ val lock = atomix.lockBuilder(lockName)
+ .withProtocol(protocol)
+ .build()
+ return lock
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
index 919d6712b..b25706972 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
@@ -17,11 +17,13 @@
package org.onap.ccsdk.cds.blueprintsprocessor.atomix
import com.fasterxml.jackson.databind.JsonNode
+import io.atomix.core.Atomix
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
import org.junit.Before
import org.junit.Test
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
@@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
class AtomixBluePrintClusterServiceTest {
val log = logger(AtomixBluePrintClusterServiceTest::class)
@@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest {
atomixClusterService.atomix
}
}
- val atomix = deferred.awaitAll()
- /** Test Distributed store creation */
- repeat(2) { storeId ->
- val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
- assertNotNull(store, "failed to get store")
- val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
- store1.addListener {
- log.info("Received map event : $it")
- }
- repeat(10) {
- store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
- }
- delay(100)
- store.close()
+ val atomixs = deferred.awaitAll()
+ testDistributedStore(atomixs)
+ testDistributedLock(atomixs)
+ }
+ }
+
+ private suspend fun testDistributedStore(atomix: List<Atomix>) {
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId")
+ assertNotNull(store, "failed to get store")
+ val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId")
+ store1.addListener {
+ log.info("Received map event : $it")
+ }
+ repeat(10) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
}
+ delay(100)
+ store.close()
+ }
+ }
+
+ private suspend fun testDistributedLock(atomix: List<Atomix>) {
+ val lockName = "sample-lock"
+ withContext(Dispatchers.IO) {
+ val deferred = async {
+ executeLock(atomix.get(0), "first", lockName)
+ }
+ val deferred2 = async {
+ executeLock(atomix.get(1), "second", lockName)
+ }
+ val deferred3 = async {
+ executeLock(atomix.get(1), "third", lockName)
+ }
+ deferred.start()
+ deferred2.start()
+ deferred3.start()
+ }
+ }
+
+ private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) {
+ log.info("initialising $lockId lock...")
+ val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName)
+ assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+ distributedLock.lock()
+ assertTrue(distributedLock.isLocked, "failed to lock $lockId")
+ try {
+ log.info("locked $lockId process for 5mSec")
+ delay(5)
+ } finally {
+ distributedLock.unlock()
+ log.info("$lockId lock released")
}
+ distributedLock.close()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
index 976f9f5e8..549be6481 100644
--- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt
@@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import kotlin.test.assertNotNull
@@ -106,6 +107,7 @@ class BluePrintNatsServiceTest {
testMultiPublish(natsService)
testLoadBalance(natsService)
+ testLimitSubscription(natsService)
testRequestReply(natsService)
testMultiRequestReply(natsService)
delay(1000)
@@ -137,12 +139,49 @@ class BluePrintNatsServiceTest {
val lbMessageHandler2 =
MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") }
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
- natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
+ val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1)
+ val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2)
repeat(5) {
natsService.publish("lb-publish", "lb publish message-$it".toByteArray())
}
+ sub1.unsubscribe()
+ sub2.unsubscribe()
+ }
+ }
+
+ private fun testLimitSubscription(natsService: BluePrintNatsService) {
+ runBlocking {
+ /** Load balance Publish Message Test **/
+ val lbMessageHandler1 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 1: ${message.strData()}")
+ message.ack()
+ }
+ }
+ val lbMessageHandler2 =
+ MessageHandler { message ->
+ runBlocking {
+ println("LB Publish Message Handler 2: ${message.strData()}")
+ message.ack()
+ }
+ }
+
+ val sub1 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler1,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+ val sub2 = natsService.loadBalanceSubscribe(
+ "lb-publish", "lb-group", lbMessageHandler2,
+ SubscriptionOptionsUtils.manualAckWithRateLimit(1)
+ )
+
+ repeat(10) {
+ natsService.publish("lb-publish", "lb limit message-$it".toByteArray())
+ }
+ sub1.unsubscribe()
+ sub2.unsubscribe()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index bbaa427c9..6fd443624 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -35,6 +35,9 @@ interface BluePrintClusterService {
/** Create and get or get the distributed data map store with [name] */
suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
+ /** Create and get the distributed lock with [name] */
+ suspend fun clusterLock(name: String): ClusterLock
+
/** Shut down the cluster with [duration] */
suspend fun shutDown(duration: Duration)
}
@@ -48,4 +51,11 @@ data class ClusterInfo(
var storagePath: String
)
-data class ClusterMember(val id: String, val memberAddress: String?)
+data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+
+interface ClusterLock {
+ suspend fun lock()
+ suspend fun tryLock(timeout: Long): Boolean
+ suspend fun unLock()
+ fun isLocked(): Boolean
+}