diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
4 files changed, 109 insertions, 6 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 85d9d5c27..81fc0d709 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -17,9 +17,15 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster import com.hazelcast.cluster.Member +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** @@ -44,3 +50,26 @@ fun Member.toClusterMember(): ClusterMember { memberAddress = this.address.toString() ) } + +/** + * This function will try to acquire the lock and then execute the provided block. + * If the lock cannot be acquired within timeout, a BlueprintException will be thrown. + * + * Since a lock can only be unlocked by the the thread which acquired the lock, + * this function will confine coroutines within the block to a dedicated thread. + */ +suspend fun <R> ClusterLock.executeWithLock(acquireLockTimeout: Long, block: suspend () -> R): R { + val lock = this + return newSingleThreadContext(lock.name()).use { + withContext(GlobalScope.coroutineContext[MDCContext]?.plus(it) ?: it) { + if (lock.tryLock(acquireLockTimeout)) { + try { + block() + } finally { + lock.unLock() + } + } else + throw BluePrintException("Failed to acquire lock within timeout") + } + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt index a58c077fa..feb2a8e2a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -133,7 +133,7 @@ open class HazlecastClusterService : BluePrintClusterService { } override fun clusterJoined(): Boolean { - return hazelcast.lifecycleService.isRunning + return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning } override suspend fun masterMember(partitionGroup: String): ClusterMember { @@ -225,21 +225,21 @@ open class BlueprintsClusterMembershipListener() : open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { private val log = logger(ClusterLockImpl::class) - lateinit var distributedLock: FencedLock + private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name) override fun name(): String { return distributedLock.name } override suspend fun lock() { - distributedLock = hazelcast.cpSubsystem.getLock(name) distributedLock.lock() log.trace("Cluster lock($name) created..") } override suspend fun tryLock(timeout: Long): Boolean { - distributedLock = hazelcast.cpSubsystem.getLock(name) return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + .also { if (it) log.trace("Cluster lock acquired: $name") + else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") } } override suspend fun unLock() { @@ -255,14 +255,12 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val } override suspend fun fenceLock(): String { - distributedLock = hazelcast.cpSubsystem.getLock(name) val fence = distributedLock.lockAndGetFence() log.trace("Cluster lock($name) fence($fence) created..") return fence.toString() } override suspend fun tryFenceLock(timeout: Long): String { - distributedLock = hazelcast.cpSubsystem.getLock(name) return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 53f18d38a..9725553a5 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -80,3 +80,5 @@ interface ClusterLock { fun isLocked(): Boolean fun close() } + +const val CDS_LOCK_GROUP = "cds-lock" diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt new file mode 100644 index 000000000..7ef4eb49b --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright © 2019 Bell Canada. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.runBlocking +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import java.lang.RuntimeException +import kotlin.test.assertEquals + +class BluePrintClusterExtensionsTest { + + private lateinit var clusterLockMock: ClusterLock + + @Before + fun setup() { + clusterLockMock = mockk() + every { clusterLockMock.name() } returns "mock-lock" + } + + @Test + fun `executeWithLock - should call unlock and return block result`() { + runBlocking { + every { runBlocking { clusterLockMock.tryLock(more(0L)) } } returns true + every { runBlocking { clusterLockMock.unLock() } } returns Unit + + val result = clusterLockMock.executeWithLock(1_000) { "result" } + + verify { runBlocking { clusterLockMock.unLock() } } + assertEquals("result", result) + } + } + + @Test + fun `executeWithLock - should call unlock even when block throws exception`() { + runBlocking { + every { runBlocking { clusterLockMock.tryLock(more(0L)) } } returns true + every { runBlocking { clusterLockMock.unLock() } } returns Unit + + try { + clusterLockMock.executeWithLock(1_000) { throw RuntimeException("It crashed") } + } catch (e: Exception) { } + + verify { runBlocking { clusterLockMock.unLock() } } + } + } + + @Test(expected = BluePrintException::class) + fun `executeWithLock - should throw exception when lock was not acquired within timeout`() { + runBlocking { + every { runBlocking { clusterLockMock.tryLock(eq(0L)) } } returns false + clusterLockMock.executeWithLock(0) { "Will not run" } + } + } +} |