summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt29
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt10
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt74
4 files changed, 109 insertions, 6 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
index 85d9d5c27..81fc0d709 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
@@ -17,9 +17,15 @@
package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
import com.hazelcast.cluster.Member
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.newSingleThreadContext
+import kotlinx.coroutines.withContext
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
/**
@@ -44,3 +50,26 @@ fun Member.toClusterMember(): ClusterMember {
memberAddress = this.address.toString()
)
}
+
+/**
+ * This function will try to acquire the lock and then execute the provided block.
+ * If the lock cannot be acquired within timeout, a BlueprintException will be thrown.
+ *
+ * Since a lock can only be unlocked by the the thread which acquired the lock,
+ * this function will confine coroutines within the block to a dedicated thread.
+ */
+suspend fun <R> ClusterLock.executeWithLock(acquireLockTimeout: Long, block: suspend () -> R): R {
+ val lock = this
+ return newSingleThreadContext(lock.name()).use {
+ withContext(GlobalScope.coroutineContext[MDCContext]?.plus(it) ?: it) {
+ if (lock.tryLock(acquireLockTimeout)) {
+ try {
+ block()
+ } finally {
+ lock.unLock()
+ }
+ } else
+ throw BluePrintException("Failed to acquire lock within timeout")
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
index a58c077fa..feb2a8e2a 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
@@ -133,7 +133,7 @@ open class HazlecastClusterService : BluePrintClusterService {
}
override fun clusterJoined(): Boolean {
- return hazelcast.lifecycleService.isRunning
+ return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning
}
override suspend fun masterMember(partitionGroup: String): ClusterMember {
@@ -225,21 +225,21 @@ open class BlueprintsClusterMembershipListener() :
open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
private val log = logger(ClusterLockImpl::class)
- lateinit var distributedLock: FencedLock
+ private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name)
override fun name(): String {
return distributedLock.name
}
override suspend fun lock() {
- distributedLock = hazelcast.cpSubsystem.getLock(name)
distributedLock.lock()
log.trace("Cluster lock($name) created..")
}
override suspend fun tryLock(timeout: Long): Boolean {
- distributedLock = hazelcast.cpSubsystem.getLock(name)
return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
+ .also { if (it) log.trace("Cluster lock acquired: $name")
+ else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") }
}
override suspend fun unLock() {
@@ -255,14 +255,12 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val
}
override suspend fun fenceLock(): String {
- distributedLock = hazelcast.cpSubsystem.getLock(name)
val fence = distributedLock.lockAndGetFence()
log.trace("Cluster lock($name) fence($fence) created..")
return fence.toString()
}
override suspend fun tryFenceLock(timeout: Long): String {
- distributedLock = hazelcast.cpSubsystem.getLock(name)
return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index 53f18d38a..9725553a5 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -80,3 +80,5 @@ interface ClusterLock {
fun isLocked(): Boolean
fun close()
}
+
+const val CDS_LOCK_GROUP = "cds-lock"
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt
new file mode 100644
index 000000000..7ef4eb49b
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensionsTest.kt
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2019 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import java.lang.RuntimeException
+import kotlin.test.assertEquals
+
+class BluePrintClusterExtensionsTest {
+
+ private lateinit var clusterLockMock: ClusterLock
+
+ @Before
+ fun setup() {
+ clusterLockMock = mockk()
+ every { clusterLockMock.name() } returns "mock-lock"
+ }
+
+ @Test
+ fun `executeWithLock - should call unlock and return block result`() {
+ runBlocking {
+ every { runBlocking { clusterLockMock.tryLock(more(0L)) } } returns true
+ every { runBlocking { clusterLockMock.unLock() } } returns Unit
+
+ val result = clusterLockMock.executeWithLock(1_000) { "result" }
+
+ verify { runBlocking { clusterLockMock.unLock() } }
+ assertEquals("result", result)
+ }
+ }
+
+ @Test
+ fun `executeWithLock - should call unlock even when block throws exception`() {
+ runBlocking {
+ every { runBlocking { clusterLockMock.tryLock(more(0L)) } } returns true
+ every { runBlocking { clusterLockMock.unLock() } } returns Unit
+
+ try {
+ clusterLockMock.executeWithLock(1_000) { throw RuntimeException("It crashed") }
+ } catch (e: Exception) { }
+
+ verify { runBlocking { clusterLockMock.unLock() } }
+ }
+ }
+
+ @Test(expected = BluePrintException::class)
+ fun `executeWithLock - should throw exception when lock was not acquired within timeout`() {
+ runBlocking {
+ every { runBlocking { clusterLockMock.tryLock(eq(0L)) } } returns false
+ clusterLockMock.executeWithLock(0) { "Will not run" }
+ }
+ }
+}