From 3579d15b41d9f786650f76c3b6a98d28f0052f1a Mon Sep 17 00:00:00 2001 From: Jozsef Csongvai Date: Mon, 15 Jun 2020 08:42:08 -0400 Subject: Implement nodetemplate locking feature Enables locking execution of a nodetemplate using a lock key and lock acquire timeout. Issue-ID: CCSDK-2460 Change-Id: I308d4d89dab44b7f7a766d5b62258e67b051eab1 Signed-off-by: Jozsef Csongvai --- .../core/cluster/BluePrintClusterExtensions.kt | 29 ++++++++++++++++++++++ .../core/cluster/HazlecastClusterService.kt | 10 +++----- .../core/service/BluePrintClusterService.kt | 2 ++ 3 files changed, 35 insertions(+), 6 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/processor-core/src/main') diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 85d9d5c27..81fc0d709 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -17,9 +17,15 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster import com.hazelcast.cluster.Member +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** @@ -44,3 +50,26 @@ fun Member.toClusterMember(): ClusterMember { memberAddress = this.address.toString() ) } + +/** + * This function will try to acquire the lock and then execute the provided block. + * If the lock cannot be acquired within timeout, a BlueprintException will be thrown. + * + * Since a lock can only be unlocked by the the thread which acquired the lock, + * this function will confine coroutines within the block to a dedicated thread. + */ +suspend fun ClusterLock.executeWithLock(acquireLockTimeout: Long, block: suspend () -> R): R { + val lock = this + return newSingleThreadContext(lock.name()).use { + withContext(GlobalScope.coroutineContext[MDCContext]?.plus(it) ?: it) { + if (lock.tryLock(acquireLockTimeout)) { + try { + block() + } finally { + lock.unLock() + } + } else + throw BluePrintException("Failed to acquire lock within timeout") + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt index a58c077fa..feb2a8e2a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -133,7 +133,7 @@ open class HazlecastClusterService : BluePrintClusterService { } override fun clusterJoined(): Boolean { - return hazelcast.lifecycleService.isRunning + return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning } override suspend fun masterMember(partitionGroup: String): ClusterMember { @@ -225,21 +225,21 @@ open class BlueprintsClusterMembershipListener() : open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { private val log = logger(ClusterLockImpl::class) - lateinit var distributedLock: FencedLock + private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name) override fun name(): String { return distributedLock.name } override suspend fun lock() { - distributedLock = hazelcast.cpSubsystem.getLock(name) distributedLock.lock() log.trace("Cluster lock($name) created..") } override suspend fun tryLock(timeout: Long): Boolean { - distributedLock = hazelcast.cpSubsystem.getLock(name) return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + .also { if (it) log.trace("Cluster lock acquired: $name") + else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") } } override suspend fun unLock() { @@ -255,14 +255,12 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val } override suspend fun fenceLock(): String { - distributedLock = hazelcast.cpSubsystem.getLock(name) val fence = distributedLock.lockAndGetFence() log.trace("Cluster lock($name) fence($fence) created..") return fence.toString() } override suspend fun tryFenceLock(timeout: Long): String { - distributedLock = hazelcast.cpSubsystem.getLock(name) return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 53f18d38a..9725553a5 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -80,3 +80,5 @@ interface ClusterLock { fun isLocked(): Boolean fun close() } + +const val CDS_LOCK_GROUP = "cds-lock" -- cgit 1.2.3-korg