diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/atomix-lib/src')
3 files changed, 131 insertions, 31 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt index 27921be9d..b5ec6dd5c 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt @@ -18,10 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service import io.atomix.cluster.ClusterMembershipEvent import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import kotlinx.coroutines.delay import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.stereotype.Service @@ -35,8 +37,6 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { lateinit var atomix: Atomix - private var joined = false - override suspend fun startCluster(clusterInfo: ClusterInfo) { log.info( "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + @@ -53,10 +53,10 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { /** Listen for the member chaneg events */ atomix.membershipService.addListener { membershipEvent -> when (membershipEvent.type()) { - ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("***** New Member Added") - ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("***** Member Removed") - ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("***** Metadata Changed Removed") - else -> log.info("***** Member event unknown") + ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") + ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") + else -> log.info("Member event unknown") } } atomix.start().join() @@ -77,16 +77,16 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { "ping", "ping from node(${clusterInfo.nodeId})" ) - joined = true } override fun clusterJoined(): Boolean { - return joined + return atomix.isRunning } override suspend fun allMembers(): Set<ClusterMember> { check(::atomix.isInitialized) { "failed to start and join cluster" } check(atomix.isRunning) { "cluster is not running" } + return atomix.membershipService.members.map { ClusterMember( id = it.id().id(), @@ -106,13 +106,51 @@ open class AtomixBluePrintClusterService : BluePrintClusterService { } override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + check(::atomix.isInitialized) { "failed to start and join cluster" } return AtomixLibUtils.distributedMapStore<T>(atomix, name) } + /** The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. * + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::atomix.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(atomix, name) + } + override suspend fun shutDown(duration: Duration) { - val shutDownMilli = duration.toMillis() - log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") - delay(shutDownMilli) - atomix.stop() + if (::atomix.isInitialized) { + val shutDownMilli = duration.toMillis() + log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") + delay(shutDownMilli) + atomix.stop() + } + } +} + +open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { + + lateinit var distributedLock: DistributedLock + + override suspend fun lock() { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + distributedLock.lock() + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = AtomixLibUtils.distributedLock(atomix, name) + return distributedLock.tryLock(Duration.ofMillis(timeout)) + } + + override suspend fun unLock() { + distributedLock.unlock() + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt index 6e726a1a6..a2a0d3902 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.node.MissingNode import com.fasterxml.jackson.databind.node.NullNode import com.fasterxml.jackson.databind.node.ObjectNode import io.atomix.core.Atomix +import io.atomix.core.lock.DistributedLock import io.atomix.core.map.DistributedMap import io.atomix.protocols.backup.MultiPrimaryProtocol import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup @@ -29,28 +30,34 @@ import io.atomix.protocols.raft.partition.RaftPartitionGroup import io.atomix.utils.net.Address import org.jsoup.nodes.TextNode import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile object AtomixLibUtils { + private val log = logger(AtomixLibUtils::class) fun configAtomix(filePath: String): Atomix { val configFile = normalizedFile(filePath) return Atomix.builder(configFile.absolutePath).build() } - fun defaultMulticastAtomix(clusterInfo: ClusterInfo): Atomix { + fun defaultMulticastAtomix( + clusterInfo: ClusterInfo, + raftPartitions: Int = 1, + primaryBackupPartitions: Int = 32 + ): Atomix { val nodeId = clusterInfo.nodeId val raftPartitionGroup = RaftPartitionGroup.builder("system") - .withNumPartitions(7) + .withNumPartitions(raftPartitions) .withMembers(clusterInfo.clusterMembers) .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) .build() val primaryBackupGroup = PrimaryBackupPartitionGroup.builder("data") - .withNumPartitions(31) + .withNumPartitions(primaryBackupPartitions) .build() return Atomix.builder() @@ -62,11 +69,11 @@ object AtomixLibUtils { .build() } - fun <T> distributedMapStore(atomix: Atomix, storeName: String): DistributedMap<String, T> { + fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> { check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } val protocol = MultiPrimaryProtocol.builder() - .withBackups(2) + .withBackups(numBackups) .build() return atomix.mapBuilder<String, T>(storeName) @@ -79,4 +86,17 @@ object AtomixLibUtils { ) .build() } + + fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { + check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } + + val protocol = MultiPrimaryProtocol.builder() + .withBackups(numBackups) + .build() + + val lock = atomix.lockBuilder(lockName) + .withProtocol(protocol) + .build() + return lock + } } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt index 919d6712b..b25706972 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt @@ -17,11 +17,13 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix import com.fasterxml.jackson.databind.JsonNode +import io.atomix.core.Atomix import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService @@ -31,6 +33,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir import org.onap.ccsdk.cds.controllerblueprints.core.logger import kotlin.test.assertNotNull +import kotlin.test.assertTrue class AtomixBluePrintClusterServiceTest { val log = logger(AtomixBluePrintClusterServiceTest::class) @@ -60,21 +63,60 @@ class AtomixBluePrintClusterServiceTest { atomixClusterService.atomix } } - val atomix = deferred.awaitAll() - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") - assertNotNull(store, "failed to get store") - val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") - store1.addListener { - log.info("Received map event : $it") - } - repeat(10) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - delay(100) - store.close() + val atomixs = deferred.awaitAll() + testDistributedStore(atomixs) + testDistributedLock(atomixs) + } + } + + private suspend fun testDistributedStore(atomix: List<Atomix>) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(0), "blueprint-runtime-$storeId") + assertNotNull(store, "failed to get store") + val store1 = AtomixLibUtils.distributedMapStore<JsonNode>(atomix.get(1), "blueprint-runtime-$storeId") + store1.addListener { + log.info("Received map event : $it") + } + repeat(10) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() } + delay(100) + store.close() + } + } + + private suspend fun testDistributedLock(atomix: List<Atomix>) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + executeLock(atomix.get(0), "first", lockName) + } + val deferred2 = async { + executeLock(atomix.get(1), "second", lockName) + } + val deferred3 = async { + executeLock(atomix.get(1), "third", lockName) + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock(atomix: Atomix, lockId: String, lockName: String) { + log.info("initialising $lockId lock...") + val distributedLock = AtomixLibUtils.distributedLock(atomix, lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked, "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unlock() + log.info("$lockId lock released") } + distributedLock.close() } } |