From 42b3d82ccbe1f0cdf7fa46f605d9f5a2fd96364d Mon Sep 17 00:00:00 2001 From: Jozsef Csongvai Date: Tue, 23 Jun 2020 08:48:30 -0400 Subject: Fix hazelcast issues - confined lock tests to individual threads to ensure correct unlocking - removed silent failure in clusterlock.unlock function when unlock is called by a thread that doesnt own the lock. - added isLockedByCurrentThread method to ClusterLock interface - disabled multicast discovery, tcp-ip should be more stable for tests - fix Hazlecast typo Issue-ID: CCSDK-2429 Signed-off-by: Jozsef Csongvai Change-Id: Idfe723fff04fcd9c48510cf429eb15b33662c49d --- .../core/cluster/HazelcastClusterServiceTest.kt | 236 +++++++++++++++++++++ .../core/cluster/HazlecastClusterServiceTest.kt | 231 -------------------- .../resources/hazelcast/hazelcast-cluster.yaml | 7 +- 3 files changed, 242 insertions(+), 232 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt (limited to 'ms/blueprintsprocessor/modules/commons/processor-core/src/test') diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt new file mode 100644 index 000000000..e214b6593 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -0,0 +1,236 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.fasterxml.jackson.databind.JsonNode +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.instance.impl.HazelcastInstanceFactory +import com.hazelcast.map.IMap +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import java.io.Serializable +import java.util.Properties +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class HazelcastClusterServiceTest { + private val log = logger(HazelcastClusterServiceTest::class) + private val clusterSize = 3 + + @Before + @After + fun killAllHazelcastInstances() { + HazelcastInstanceFactory.terminateAll() + } + + @Test + fun testClientFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + System.setProperty( + "hazelcast.client.config", + normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath + ) + val config = YamlClientConfigBuilder().build() + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testServerFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") + val config = FileSystemYamlConfig(configFile) + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testClusterJoin() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(1, 2, 3)).toMutableList() + printReachableMembers(bluePrintClusterServiceOne) + testDistributedStore(bluePrintClusterServiceOne) + testDistributedLock(bluePrintClusterServiceOne) + } + } + + private suspend fun createCluster( + ids: List, + joinAsClient: Boolean? = false + ): List { + + return withContext(Dispatchers.Default) { + val deferred = ids.map { id -> + async(Dispatchers.IO) { + val nodeId = "node-$id" + log.info("********** Starting ($nodeId)") + val properties = Properties() + properties["hazelcast.logging.type"] = "slf4j" + val clusterInfo = + if (joinAsClient!!) { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = true, + configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", + properties = properties + ) + } else { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = false, + configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml", + properties = properties + ) + } + val hazelcastClusterService = HazelcastClusterService() + hazelcastClusterService.startCluster(clusterInfo) + hazelcastClusterService + } + } + deferred.awaitAll() + } + } + + private suspend fun testDistributedStore(bluePrintClusterServices: List) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = bluePrintClusterServices[0].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + assertNotNull(store, "failed to get store") + repeat(5) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + + val store1 = bluePrintClusterServices[1].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + + store1.values.map { + log.trace("Received map event : $it") + } + delay(5) + store.clear() + } + } + + private suspend fun testDistributedLock(bluePrintClusterServices: List) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + newSingleThreadContext("first").use { + withContext(it) { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + } + } + val deferred2 = async { + newSingleThreadContext("second").use { + withContext(it) { + executeLock(bluePrintClusterServices[1], "second", lockName) + } + } + } + val deferred3 = async { + newSingleThreadContext("third").use { + withContext(it) { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + } + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock( + bluePrintClusterService: BluePrintClusterService, + lockId: String, + lockName: String + ) { + log.info("initialising $lockId lock...") + val distributedLock = bluePrintClusterService.clusterLock(lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked(), "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unLock() + log.info("$lockId lock released") + } + distributedLock.close() + } + + private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { + log.info("initialising ...") + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + + val memberNameMap = bluePrintClusterService.clusterMapStore("member-name-map") as IMap + assertEquals(3, memberNameMap.size, "failed to match member size") + memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } + val scheduler = hazelcastClusterService.clusterScheduler("cleanup") + // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) + // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) + // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) + // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) + } + + private suspend fun printReachableMembers(bluePrintClusterServices: List) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + val hazelcast = hazelcastClusterService.hazelcast + val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null + val master = hazelcastClusterService.masterMember("system").memberAddress + val members = hazelcastClusterService.allMembers().map { it.memberAddress } + log.info("Cluster Members for($self): master($master) Members($members)") + } + + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-") + assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") + log.info("Cluster applicationMembers ($applicationMembers)") + } +} + +open class SampleSchedulerTask : Runnable, Serializable { + private val log = logger(SampleSchedulerTask::class) + override fun run() { + log.info("I am scheduler action") + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt deleted file mode 100644 index 80cf41558..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster - -import com.fasterxml.jackson.databind.JsonNode -import com.hazelcast.client.config.YamlClientConfigBuilder -import com.hazelcast.cluster.Member -import com.hazelcast.config.FileSystemYamlConfig -import com.hazelcast.map.IMap -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile -import java.io.Serializable -import java.time.Duration -import java.util.Properties -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class HazlecastClusterServiceTest { - private val log = logger(HazlecastClusterServiceTest::class) - private val clusterSize = 3 - - @Test - fun testClientFileSystemYamlConfig() { - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") - System.setProperty( - "hazelcast.client.config", - normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath - ) - val config = YamlClientConfigBuilder().build() - assertNotNull(config) - assertEquals("test-cluster", config.clusterName) - assertEquals("node-1234", config.instanceName) - } - - @Test - fun testServerFileSystemYamlConfig() { - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") - val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") - val config = FileSystemYamlConfig(configFile) - assertNotNull(config) - assertEquals("test-cluster", config.clusterName) - assertEquals("node-1234", config.instanceName) - } - - @Test - fun testClusterJoin() { - runBlocking { - val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() - // delay(1000) - // Join as Hazlecast Management Node - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) - printReachableMembers(bluePrintClusterServiceOne) - testDistributedStore(bluePrintClusterServiceOne) - testDistributedLock(bluePrintClusterServiceOne) - - // executeScheduler(bluePrintClusterServiceOne[0]) - // delay(1000) - // Shutdown - shutdown(bluePrintClusterServiceOne) - } - } - - private suspend fun createCluster( - ports: List, - joinAsClient: Boolean? = false - ): List { - - return withContext(Dispatchers.Default) { - val deferred = ports.map { port -> - async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") - val properties = Properties() - properties["hazelcast.logging.type"] = "slf4j" - val clusterInfo = - if (joinAsClient!!) { - ClusterInfo( - id = "test-cluster", nodeId = nodeId, joinAsClient = true, - configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", - properties = properties - ) - } else { - ClusterInfo( - id = "test-cluster", nodeId = nodeId, joinAsClient = false, - configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml", - properties = properties - ) - } - val hazlecastClusterService = HazlecastClusterService() - hazlecastClusterService.startCluster(clusterInfo) - hazlecastClusterService - } - } - deferred.awaitAll() - } - } - - private suspend fun shutdown(bluePrintClusterServices: List) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - bluePrintClusterService.shutDown(Duration.ofMillis(10)) - } - } - - private suspend fun testDistributedStore(bluePrintClusterServices: List) { - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = bluePrintClusterServices[0].clusterMapStore( - "blueprint-runtime-$storeId" - ) as IMap - assertNotNull(store, "failed to get store") - repeat(5) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - - val store1 = bluePrintClusterServices[1].clusterMapStore( - "blueprint-runtime-$storeId" - ) as IMap - - store1.values.map { - log.trace("Received map event : $it") - } - delay(5) - store.clear() - } - } - - private suspend fun testDistributedLock(bluePrintClusterServices: List) { - val lockName = "sample-lock" - withContext(Dispatchers.IO) { - val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) - } - val deferred2 = async { - executeLock(bluePrintClusterServices[1], "second", lockName) - } - val deferred3 = async { - executeLock(bluePrintClusterServices[2], "third", lockName) - } - deferred.start() - deferred2.start() - deferred3.start() - } - } - - private suspend fun executeLock( - bluePrintClusterService: BluePrintClusterService, - lockId: String, - lockName: String - ) { - log.info("initialising $lockId lock...") - val distributedLock = bluePrintClusterService.clusterLock(lockName) - assertNotNull(distributedLock, "failed to create distributed $lockId lock") - distributedLock.lock() - assertTrue(distributedLock.isLocked(), "failed to lock $lockId") - try { - log.info("locked $lockId process for 5mSec") - delay(5) - } finally { - distributedLock.unLock() - log.info("$lockId lock released") - } - distributedLock.close() - } - - private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { - log.info("initialising ...") - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - - val memberNameMap = bluePrintClusterService.clusterMapStore("member-name-map") as IMap - assertEquals(3, memberNameMap.size, "failed to match member size") - memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } - val scheduler = hazlecastClusterService.clusterScheduler("cleanup") - // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) - // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) - // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) - // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) - } - - private suspend fun printReachableMembers(bluePrintClusterServices: List) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - val hazelcast = hazlecastClusterService.hazelcast - val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null - val master = hazlecastClusterService.masterMember("system").memberAddress - val members = hazlecastClusterService.allMembers().map { it.memberAddress } - log.info("Cluster Members for($self): master($master) Members($members)") - } - - val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") - assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") - log.info("Cluster applicationMembers ($applicationMembers)") - } -} - -open class SampleSchedulerTask : Runnable, Serializable { - private val log = logger(SampleSchedulerTask::class) - override fun run() { - log.info("I am scheduler action") - } -} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml index de6047a90..b4dc3454a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml @@ -7,10 +7,15 @@ hazelcast: session-time-to-live-seconds: 60 session-heartbeat-interval-seconds: 5 missing-cp-member-auto-removal-seconds: 120 + metrics: + enabled: false network: join: - multicast: + tcp-ip: enabled: true + interface: 127.0.0.1 + multicast: + enabled: false # Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue # on macOs with docker installed and multicast address different than 224.0.0.1 # https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac -- cgit 1.2.3-korg