diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules')
7 files changed, 192 insertions, 37 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index caf063161..571f0a176 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -27,7 +27,7 @@ object BluePrintConstants { val APP_NAME = System.getProperty("APPLICATION_NAME") ?: System.getProperty("APP_NAME") ?: System.getProperty("APPNAME") - ?: "cds-controller-default" + ?: "cds-controller" const val DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" @@ -219,6 +219,7 @@ object BluePrintConstants { val USE_SCRIPT_COMPILE_CACHE: Boolean = (System.getenv("USE_SCRIPT_COMPILE_CACHE") ?: "true").toBoolean() /** Cluster Properties */ + val CLUSTER_ENABLED = (System.getenv("CLUSTER_ENABLED") ?: "false").toBoolean() const val PROPERTY_CLUSTER_ID = "CLUSTER_ID" const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID" const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS" diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt index d5ffe6b7f..b52cd711b 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt @@ -28,10 +28,15 @@ object ClusterUtils { } fun clusterId(): String { - return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster" + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster" } fun clusterNodeId(): String { - return System.getProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller" + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: "cds-controller-0" + } + + fun clusterNodeAddress(): String { + return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_NODE_ADDRESS) + ?: clusterNodeId() } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 27921be9d..b5ec6dd5c 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service import io.atomix.cluster.ClusterMembershipEvent import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import kotlinx.coroutines.delay import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.stereotype.Service @@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { lateinit var atomix: Atomix - private var joined = false - override suspend fun startCluster(clusterInfo: ClusterInfo) { log.info( "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + @@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { /** Listen for the member chaneg events */ atomix.membershipService.addListener { membershipEvent -> when (membershipEvent.type()) { - ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") - ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") - ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") - else -> log.info("***** Member event unknown") + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") + else -> log.info("Member event unknown") } } atomix.start().join() @@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { "ping", "ping from node(${clusterInfo.nodeId})" ) - joined = true } override fun clusterJoined(): Boolean { - return joined + return atomix.isRunning } override suspend fun allMembers(): Set<ClusterMember> { check(::atomix.isInitialized) { "failed to start and join cluster" } check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { ClusterMember( id = it.id().id(), @@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + check(::atomix.isInitialized) { "failed to start and join cluster" } return AtomixLibUtils.distributedMapStore<T>(atomix, name) } + /** The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. * + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::atomix.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(atomix, name) + } + override suspend fun shutDown(duration: Duration) { - val shutDownMilli = duration.toMillis() - log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") - delay(shutDownMilli) - atomix.stop() + if (::atomix.isInitialized) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } + } +} + +open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + + lateinit var distributedLock: DistributedLock + + override suspend fun lock() { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + distributedLock.lock() + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + return distributedLock.tryLock(Duration.ofMillis(timeout)) + } + + override suspend fun unLock() { + distributedLock.unlock() + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index 6e726a1a6..a2a0d3902 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup @@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup import io.atomix.utils.net.Address import org.jsoup.nodes.TextNode import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile object AtomixLibUtils { + private val log = logger(AtomixLibUtils::class) fun configAtomix(filePath: String): Atomix { val configFile = normalizedFile(filePath) return Atomix.builder(configFile.absolutePath).build() } - fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + fun defaultMulticastAtomix( + clusterInfo: ClusterInfo, + raftPartitions: Int = 1, + primaryBackupPartitions: Int = 32 + ): Atomix { val nodeId = clusterInfo.nodeId val raftPartitionGroup = RaftPartitionGroup.builder("system") - .withNumPartitions(7) + .withNumPartitions(raftPartitions) .withMembers(clusterInfo.clusterMembers) .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) .build() val primaryBackupGroup = PrimaryBackupPartitionGroup.builder("data") - .withNumPartitions(31) + .withNumPartitions(primaryBackupPartitions) .build() return Atomix.builder() @@ -62,11 +69,11 @@ object AtomixLibUtils { .build() } - fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> { + fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> { check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } val protocol = MultiPrimaryProtocol.builder() - .withBackups(2) + .withBackups(numBackups) .build() return atomix.mapBuilder<String, T>(storeName) @@ -79,4 +86,17 @@ object AtomixLibUtils { ) .build() } + + fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() + + val lock = atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + return lock + } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 919d6712b..b25706972 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -17,11 +17,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix import com.fasterxml.jackson.databind.JsonNode +import io.atomix.core.Atomix import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService @@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir import org.onap.ccsdk.cds.controllerblueprints.core.logger import kotlin.test.assertNotNull +import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { val log = logger(AtomixBluePrintClusterServiceTest::class) @@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest { atomixClusterService.atomix } } - val atomix = deferred.awaitAll() - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") - assertNotNull(store, "failed to get store") - val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") - store1.addListener { - log.info("Received map event : $it") - } - repeat(10) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - delay(100) - store.close() + val atomixs = deferred.awaitAll() + testDistributedStore(atomixs) + testDistributedLock(atomixs) + } + } + + private suspend fun testDistributedStore(atomix: List<Atomix>) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() } + delay(100) + store.close() + } + } + + private suspend fun testDistributedLock(atomix: List<Atomix>) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + executeLock(atomix.get(0), "first", lockName) + } + val deferred2 = async { + executeLock(atomix.get(1), "second", lockName) + } + val deferred3 = async { + executeLock(atomix.get(1), "third", lockName) + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) { + log.info("initialising $lockId lock...") + val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked, "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unlock() + log.info("$lockId lock released") } + distributedLock.close() } } diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt index 976f9f5e8..549be6481 100644 --- a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt @@ -27,6 +27,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties import org.onap.ccsdk.cds.blueprintsprocessor.nats.strData +import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import kotlin.test.assertNotNull @@ -106,6 +107,7 @@ class BluePrintNatsServiceTest { testMultiPublish(natsService) testLoadBalance(natsService) + testLimitSubscription(natsService) testRequestReply(natsService) testMultiRequestReply(natsService) delay(1000) @@ -137,12 +139,49 @@ class BluePrintNatsServiceTest { val lbMessageHandler2 = MessageHandler { message -> println("LB Publish Message Handler 2: ${message.strData()}") } - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) - natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) + val sub1 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler1) + val sub2 = natsService.loadBalanceSubscribe("lb-publish", "lb-group", lbMessageHandler2) repeat(5) { natsService.publish("lb-publish", "lb publish message-$it".toByteArray()) } + sub1.unsubscribe() + sub2.unsubscribe() + } + } + + private fun testLimitSubscription(natsService: BluePrintNatsService) { + runBlocking { + /** Load balance Publish Message Test **/ + val lbMessageHandler1 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 1: ${message.strData()}") + message.ack() + } + } + val lbMessageHandler2 = + MessageHandler { message -> + runBlocking { + println("LB Publish Message Handler 2: ${message.strData()}") + message.ack() + } + } + + val sub1 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler1, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + val sub2 = natsService.loadBalanceSubscribe( + "lb-publish", "lb-group", lbMessageHandler2, + SubscriptionOptionsUtils.manualAckWithRateLimit(1) + ) + + repeat(10) { + natsService.publish("lb-publish", "lb limit message-$it".toByteArray()) + } + sub1.unsubscribe() + sub2.unsubscribe() } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index bbaa427c9..6fd443624 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -35,6 +35,9 @@ interface BluePrintClusterService { /** Create and get or get the distributed data map store with [name] */ suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> + /** Create and get the distributed lock with [name] */ + suspend fun clusterLock(name: String): ClusterLock + /** Shut down the cluster with [duration] */ suspend fun shutDown(duration: Duration) } @@ -48,4 +51,11 @@ data class ClusterInfo( var storagePath: String ) -data class ClusterMember(val id: String, val memberAddress: String?) +data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) + +interface ClusterLock { + suspend fun lock() + suspend fun tryLock(timeout: Long): Boolean + suspend fun unLock() + fun isLocked(): Boolean +} |