From 42b3d82ccbe1f0cdf7fa46f605d9f5a2fd96364d Mon Sep 17 00:00:00 2001 From: Jozsef Csongvai Date: Tue, 23 Jun 2020 08:48:30 -0400 Subject: Fix hazelcast issues - confined lock tests to individual threads to ensure correct unlocking - removed silent failure in clusterlock.unlock function when unlock is called by a thread that doesnt own the lock. - added isLockedByCurrentThread method to ClusterLock interface - disabled multicast discovery, tcp-ip should be more stable for tests - fix Hazlecast typo Issue-ID: CCSDK-2429 Signed-off-by: Jozsef Csongvai Change-Id: Idfe723fff04fcd9c48510cf429eb15b33662c49d --- .../core/cluster/BluePrintClusterExtensions.kt | 4 +- .../core/cluster/HazelcastClusterService.kt | 270 +++++++++++++++++++++ .../core/cluster/HazelcastClusterUtils.kt | 117 +++++++++ .../core/cluster/HazlecastClusterService.kt | 269 -------------------- .../core/cluster/HazlecastClusterUtils.kt | 117 --------- .../core/service/BluePrintClusterService.kt | 1 + .../core/cluster/HazelcastClusterServiceTest.kt | 236 ++++++++++++++++++ .../core/cluster/HazlecastClusterServiceTest.kt | 231 ------------------ .../resources/hazelcast/hazelcast-cluster.yaml | 7 +- 9 files changed, 632 insertions(+), 620 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt delete mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 81fc0d709..0a58857f7 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -29,10 +29,10 @@ import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** - * Exposed Dependency Service by this Hazlecast Lib Module + * Exposed Dependency Service by this Hazelcast Lib Module */ fun BluePrintDependencyService.clusterService(): BluePrintClusterService = - instance(HazlecastClusterService::class) + instance(HazelcastClusterService::class) /** Optional Cluster Service, returns only if Cluster is enabled */ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt new file mode 100644 index 000000000..d3c88d732 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -0,0 +1,270 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.ClientConfig +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.cluster.MembershipEvent +import com.hazelcast.cluster.MembershipListener +import com.hazelcast.config.Config +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.config.MemberAttributeConfig +import com.hazelcast.core.Hazelcast +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.CPSubsystemManagementService +import com.hazelcast.cp.lock.FencedLock +import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.TimeUnit + +@Service +open class HazelcastClusterService : BluePrintClusterService { + + private val log = logger(HazelcastClusterService::class) + lateinit var hazelcast: HazelcastInstance + lateinit var cpSubsystemManagementService: CPSubsystemManagementService + var joinedClient = false + var joinedLite = false + + override suspend fun startCluster(configuration: T) { + /** Get the Hazelcast Client or Server instance */ + hazelcast = + when (configuration) { + is Config -> { + joinedLite = configuration.isLiteMember + val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration) + /** Promote as CP Member */ + promoteAsCPMember(hazelcastInstance) + hazelcastInstance + } + is ClientConfig -> { + joinedClient = true + HazelcastClient.newHazelcastClient(configuration) + } + is ClusterInfo -> { + + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id) + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId) + + val memberAttributeConfig = MemberAttributeConfig() + memberAttributeConfig.setAttribute( + BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, + configuration.nodeId + ) + + val configFile = configuration.configFile + /** Check file exists */ + val clusterConfigFile = normalizedFile(configuration.configFile) + check(clusterConfigFile.absolutePath.endsWith("yaml", true)) { + "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml" + } + check(clusterConfigFile.exists()) { + "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})" + } + log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****") + + /** Hazelcast Client from config file */ + if (configuration.joinAsClient) { + /** Set the configuration file to system properties, so that Hazelcast will read automatically */ + System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath) + joinedClient = true + val hazelcastClientConfiguration = YamlClientConfigBuilder().build() + hazelcastClientConfiguration.properties = configuration.properties + HazelcastClient.newHazelcastClient(hazelcastClientConfiguration) + } else { + /** Hazelcast Server from config file */ + val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile)) + hazelcastServerConfiguration.clusterName = configuration.id + hazelcastServerConfiguration.instanceName = configuration.nodeId + hazelcastServerConfiguration.properties = configuration.properties + hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig + joinedLite = hazelcastServerConfiguration.isLiteMember + val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + /** Promote as CP Member */ + promoteAsCPMember(hazelcastInstance) + hazelcastInstance + } + } + else -> { + throw BluePrintProcessorException("couldn't understand the cluster configuration") + } + } + + /** Add the Membership Listeners */ + hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener()) + log.info( + "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." + ) + } + + override fun isClient(): Boolean { + return joinedClient + } + + override fun isLiteMember(): Boolean { + return joinedLite + } + + override fun clusterJoined(): Boolean { + return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning + } + + override suspend fun masterMember(partitionGroup: String): ClusterMember { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.first().toClusterMember() + } + + override suspend fun allMembers(): Set { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.map { it.toClusterMember() }.toSet() + } + + override suspend fun applicationMembers(appName: String): Set { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet() + } + + override suspend fun clusterMapStore(name: String): MutableMap { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getMap(name) + } + + /** + * The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(hazelcast, name) + } + + /** Return interface may change and it will be included in BluePrintClusterService */ + @UseExperimental + suspend fun clusterScheduler(name: String): IScheduledExecutorService { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getScheduledExecutorService(name) + } + + override suspend fun shutDown(duration: Duration) { + if (::hazelcast.isInitialized && clusterJoined()) { + delay(duration.toMillis()) + HazelcastClusterUtils.terminate(hazelcast) + } + } + + /** Utils */ + suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { + if (!joinedClient && !joinedLite) { + HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance) + } + } + + suspend fun myHazelcastApplicationMembers(): Map { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + check(!isClient()) { "not supported for cluster client members." } + return hazelcastApplicationMembers(ClusterUtils.applicationName()) + } + + suspend fun hazelcastApplicationMembers(appName: String): Map { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + val applicationMembers: MutableMap = hashMapOf() + hazelcast.cluster.members.map { member -> + val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) + if (memberName.startsWith(appName, true)) { + applicationMembers[memberName] = member + } + } + return applicationMembers + } +} + +open class BlueprintsClusterMembershipListener() : + MembershipListener { + private val log = logger(BlueprintsClusterMembershipListener::class) + + override fun memberRemoved(membershipEvent: MembershipEvent) { + log.info("MembershipEvent: $membershipEvent") + } + + override fun memberAdded(membershipEvent: MembershipEvent) { + log.info("MembershipEvent: $membershipEvent") + } +} + +open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { + private val log = logger(ClusterLockImpl::class) + + private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name) + + override fun name(): String { + return distributedLock.name + } + + override suspend fun lock() { + distributedLock.lock() + log.trace("Cluster lock($name) created..") + } + + override suspend fun tryLock(timeout: Long): Boolean { + return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + .also { if (it) log.trace("Cluster lock acquired: $name") + else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") } + } + + override suspend fun unLock() { + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked + } + + override fun isLockedByCurrentThread(): Boolean { + return distributedLock.isLockedByCurrentThread + } + + override suspend fun fenceLock(): String { + val fence = distributedLock.lockAndGetFence() + log.trace("Cluster lock($name) fence($fence) created..") + return fence.toString() + } + + override suspend fun tryFenceLock(timeout: Long): String { + return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() + } + + override fun close() { + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt new file mode 100644 index 000000000..e5f488a0e --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt @@ -0,0 +1,117 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.CPGroup +import com.hazelcast.cp.CPMember +import com.hazelcast.cp.CPSubsystemManagementService +import com.hazelcast.instance.impl.HazelcastInstanceProxy +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.UUID +import java.util.concurrent.TimeUnit + +object HazelcastClusterUtils { + + private val log = logger(HazelcastClusterUtils::class) + + /** Promote [hazelcastInstance] member to CP Member */ + fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { + when (hazelcastInstance) { + is HazelcastInstanceProxy -> { + val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) + cpSubsystemManagementService.promoteToCPMember() + .toCompletableFuture().get() + log.info("Promoted as CP member(${hazelcastInstance.cluster.localMember})") + val clusterCPMembers = clusterCPMembers(hazelcastInstance) + log.info("CP Members(${clusterCPMembers.size}): $clusterCPMembers") + val cpGroupMembers = cpGroupMembers(hazelcastInstance) + log.info("CP Group Members(${cpGroupMembers.size}): $cpGroupMembers") + } + else -> log.debug("Client instance not eligible for CP Member promotion") + } + } + + /** Terminate [hazelcastInstance] member */ + fun terminate(hazelcastInstance: HazelcastInstance) { + log.info("Terminating Member : ${hazelcastInstance.cluster.localMember}") + hazelcastInstance.lifecycleService.terminate() + } + + /** Remove [hazelcastInstance] member from cluster CP Member List*/ + fun removeFromCPMember(hazelcastInstance: HazelcastInstance) { + // check CP Member, then remove */ + val localCPMemberUuid = localCPMemberUUID(hazelcastInstance) + localCPMemberUuid?.let { uuid -> + removeFromCPMember(hazelcastInstance, uuid) + } + } + + /** Remove [removeCPMemberUuid] member from cluster CP Member List, using [hazelcastInstance]*/ + fun removeFromCPMember(hazelcastInstance: HazelcastInstance, removeCPMemberUuid: UUID) { + val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) + cpSubsystemManagementService.removeCPMember(removeCPMemberUuid) + .toCompletableFuture().get() + log.info("Removed CP member($removeCPMemberUuid)") + } + + /** Get [hazelcastInstance] CP Group members*/ + fun cpGroupMembers(hazelcastInstance: HazelcastInstance): List { + return cpGroup(hazelcastInstance).members().toList() + } + + /** Get [hazelcastInstance] CP Group[groupName] members*/ + fun cpGroup( + hazelcastInstance: HazelcastInstance, + groupName: String? = CPGroup.METADATA_CP_GROUP_NAME + ): CPGroup { + return cpSubsystemManagementService(hazelcastInstance).getCPGroup(groupName) + .toCompletableFuture().get() + } + + /** Get [hazelcastInstance] CP member UUIDs*/ + fun clusterCPMemberUUIDs(hazelcastInstance: HazelcastInstance): List { + return clusterCPMembers(hazelcastInstance).map { it.uuid } + } + + /** Get [hazelcastInstance] CP members*/ + fun clusterCPMembers(hazelcastInstance: HazelcastInstance): List { + return cpSubsystemManagementService(hazelcastInstance).cpMembers.toCompletableFuture().get().toList() + } + + /** Get CPSubsystemManagementService for [hazelcastInstance] */ + fun cpSubsystemManagementService(hazelcastInstance: HazelcastInstance): CPSubsystemManagementService { + val cpSubsystemManagementService = hazelcastInstance.cpSubsystem.cpSubsystemManagementService + cpSubsystemManagementService.awaitUntilDiscoveryCompleted(3, TimeUnit.MINUTES) + return cpSubsystemManagementService + } + + /** Get local CPMemberUUID for [hazelcastInstance] */ + fun localCPMemberUUID(hazelcastInstance: HazelcastInstance) = localCPMember(hazelcastInstance)?.uuid + + /** Check local member is CP member for [hazelcastInstance] */ + fun checkLocalMemberIsCPMember(hazelcastInstance: HazelcastInstance): Boolean { + return localCPMember(hazelcastInstance) != null + } + + /** Get local CP member for [hazelcastInstance] */ + fun localCPMember(hazelcastInstance: HazelcastInstance) = + cpSubsystemManagementService(hazelcastInstance).localCPMember + + /** Get local CP member UUID for [hazelcastInstance] */ + fun localMemberUUID(hazelcastInstance: HazelcastInstance) = hazelcastInstance.cluster.localMember.uuid +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt deleted file mode 100644 index feb2a8e2a..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster - -import com.hazelcast.client.HazelcastClient -import com.hazelcast.client.config.ClientConfig -import com.hazelcast.client.config.YamlClientConfigBuilder -import com.hazelcast.cluster.Member -import com.hazelcast.cluster.MembershipEvent -import com.hazelcast.cluster.MembershipListener -import com.hazelcast.config.Config -import com.hazelcast.config.FileSystemYamlConfig -import com.hazelcast.config.MemberAttributeConfig -import com.hazelcast.core.Hazelcast -import com.hazelcast.core.HazelcastInstance -import com.hazelcast.cp.CPSubsystemManagementService -import com.hazelcast.cp.lock.FencedLock -import com.hazelcast.scheduledexecutor.IScheduledExecutorService -import kotlinx.coroutines.delay -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile -import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils -import org.springframework.stereotype.Service -import java.time.Duration -import java.util.concurrent.TimeUnit - -@Service -open class HazlecastClusterService : BluePrintClusterService { - - private val log = logger(HazlecastClusterService::class) - lateinit var hazelcast: HazelcastInstance - lateinit var cpSubsystemManagementService: CPSubsystemManagementService - var joinedClient = false - var joinedLite = false - - override suspend fun startCluster(configuration: T) { - /** Get the Hazelcast Client or Server instance */ - hazelcast = - when (configuration) { - is Config -> { - joinedLite = configuration.isLiteMember - val hazelcastInstance = Hazelcast.newHazelcastInstance(configuration) - /** Promote as CP Member */ - promoteAsCPMember(hazelcastInstance) - hazelcastInstance - } - is ClientConfig -> { - joinedClient = true - HazelcastClient.newHazelcastClient(configuration) - } - is ClusterInfo -> { - - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id) - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId) - - val memberAttributeConfig = MemberAttributeConfig() - memberAttributeConfig.setAttribute( - BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, - configuration.nodeId - ) - - val configFile = configuration.configFile - /** Check file exists */ - val clusterConfigFile = normalizedFile(configuration.configFile) - check(clusterConfigFile.absolutePath.endsWith("yaml", true)) { - "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml" - } - check(clusterConfigFile.exists()) { - "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})" - } - log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****") - - /** Hazelcast Client from config file */ - if (configuration.joinAsClient) { - /** Set the configuration file to system properties, so that Hazelcast will read automatically */ - System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath) - joinedClient = true - val hazelcastClientConfiguration = YamlClientConfigBuilder().build() - hazelcastClientConfiguration.properties = configuration.properties - HazelcastClient.newHazelcastClient(hazelcastClientConfiguration) - } else { - /** Hazelcast Server from config file */ - val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile)) - hazelcastServerConfiguration.clusterName = configuration.id - hazelcastServerConfiguration.instanceName = configuration.nodeId - hazelcastServerConfiguration.properties = configuration.properties - hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig - joinedLite = hazelcastServerConfiguration.isLiteMember - val hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) - /** Promote as CP Member */ - promoteAsCPMember(hazelcastInstance) - hazelcastInstance - } - } - else -> { - throw BluePrintProcessorException("couldn't understand the cluster configuration") - } - } - - /** Add the Membership Listeners */ - hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener()) - log.info( - "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." - ) - } - - override fun isClient(): Boolean { - return joinedClient - } - - override fun isLiteMember(): Boolean { - return joinedLite - } - - override fun clusterJoined(): Boolean { - return ::hazelcast.isInitialized && hazelcast.lifecycleService.isRunning - } - - override suspend fun masterMember(partitionGroup: String): ClusterMember { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return hazelcast.cluster.members.first().toClusterMember() - } - - override suspend fun allMembers(): Set { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return hazelcast.cluster.members.map { it.toClusterMember() }.toSet() - } - - override suspend fun applicationMembers(appName: String): Set { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet() - } - - override suspend fun clusterMapStore(name: String): MutableMap { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return hazelcast.getMap(name) - } - - /** - * The DistributedLock is a distributed implementation of Java’s Lock. - * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to - * determine ordering of multiple concurrent lock holders. - * DistributedLocks are designed to account for failures within the cluster. - * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, - * the lock will be released and granted to the next waiting process. - */ - override suspend fun clusterLock(name: String): ClusterLock { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return ClusterLockImpl(hazelcast, name) - } - - /** Return interface may change and it will be included in BluePrintClusterService */ - @UseExperimental - suspend fun clusterScheduler(name: String): IScheduledExecutorService { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - return hazelcast.getScheduledExecutorService(name) - } - - override suspend fun shutDown(duration: Duration) { - if (::hazelcast.isInitialized && clusterJoined()) { - delay(duration.toMillis()) - HazlecastClusterUtils.terminate(hazelcast) - } - } - - /** Utils */ - suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { - if (!joinedClient && !joinedLite) { - HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance) - } - } - - suspend fun myHazelcastApplicationMembers(): Map { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - check(!isClient()) { "not supported for cluster client members." } - return hazelcastApplicationMembers(ClusterUtils.applicationName()) - } - - suspend fun hazelcastApplicationMembers(appName: String): Map { - check(::hazelcast.isInitialized) { "failed to start and join cluster" } - val applicationMembers: MutableMap = hashMapOf() - hazelcast.cluster.members.map { member -> - val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) - if (memberName.startsWith(appName, true)) { - applicationMembers[memberName] = member - } - } - return applicationMembers - } -} - -open class BlueprintsClusterMembershipListener() : - MembershipListener { - private val log = logger(BlueprintsClusterMembershipListener::class) - - override fun memberRemoved(membershipEvent: MembershipEvent) { - log.info("MembershipEvent: $membershipEvent") - } - - override fun memberAdded(membershipEvent: MembershipEvent) { - log.info("MembershipEvent: $membershipEvent") - } -} - -open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { - private val log = logger(ClusterLockImpl::class) - - private val distributedLock: FencedLock = hazelcast.cpSubsystem.getLock(name) - - override fun name(): String { - return distributedLock.name - } - - override suspend fun lock() { - distributedLock.lock() - log.trace("Cluster lock($name) created..") - } - - override suspend fun tryLock(timeout: Long): Boolean { - return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) - .also { if (it) log.trace("Cluster lock acquired: $name") - else log.trace("Failed to acquire Cluster lock $name within timeout $timeout") } - } - - override suspend fun unLock() { - // Added condition to avoid failures like - "Current thread is not owner of the lock!" - if (distributedLock.isLockedByCurrentThread) { - distributedLock.unlock() - log.trace("Cluster unlock(${name()}) successfully..") - } - } - - override fun isLocked(): Boolean { - return distributedLock.isLocked - } - - override suspend fun fenceLock(): String { - val fence = distributedLock.lockAndGetFence() - log.trace("Cluster lock($name) fence($fence) created..") - return fence.toString() - } - - override suspend fun tryFenceLock(timeout: Long): String { - return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() - } - - override fun close() { - } -} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt deleted file mode 100644 index 70970f6da..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster - -import com.hazelcast.core.HazelcastInstance -import com.hazelcast.cp.CPGroup -import com.hazelcast.cp.CPMember -import com.hazelcast.cp.CPSubsystemManagementService -import com.hazelcast.instance.impl.HazelcastInstanceProxy -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.util.UUID -import java.util.concurrent.TimeUnit - -object HazlecastClusterUtils { - - private val log = logger(HazlecastClusterUtils::class) - - /** Promote [hazelcastInstance] member to CP Member */ - fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { - when (hazelcastInstance) { - is HazelcastInstanceProxy -> { - val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) - cpSubsystemManagementService.promoteToCPMember() - .toCompletableFuture().get() - log.info("Promoted as CP member(${hazelcastInstance.cluster.localMember})") - val clusterCPMembers = clusterCPMembers(hazelcastInstance) - log.info("CP Members(${clusterCPMembers.size}): $clusterCPMembers") - val cpGroupMembers = cpGroupMembers(hazelcastInstance) - log.info("CP Group Members(${cpGroupMembers.size}): $cpGroupMembers") - } - else -> log.debug("Client instance not eligible for CP Member promotion") - } - } - - /** Terminate [hazelcastInstance] member */ - fun terminate(hazelcastInstance: HazelcastInstance) { - log.info("Terminating Member : ${hazelcastInstance.cluster.localMember}") - hazelcastInstance.lifecycleService.terminate() - } - - /** Remove [hazelcastInstance] member from cluster CP Member List*/ - fun removeFromCPMember(hazelcastInstance: HazelcastInstance) { - // check CP Member, then remove */ - val localCPMemberUuid = localCPMemberUUID(hazelcastInstance) - localCPMemberUuid?.let { uuid -> - removeFromCPMember(hazelcastInstance, uuid) - } - } - - /** Remove [removeCPMemberUuid] member from cluster CP Member List, using [hazelcastInstance]*/ - fun removeFromCPMember(hazelcastInstance: HazelcastInstance, removeCPMemberUuid: UUID) { - val cpSubsystemManagementService = cpSubsystemManagementService(hazelcastInstance) - cpSubsystemManagementService.removeCPMember(removeCPMemberUuid) - .toCompletableFuture().get() - log.info("Removed CP member($removeCPMemberUuid)") - } - - /** Get [hazelcastInstance] CP Group members*/ - fun cpGroupMembers(hazelcastInstance: HazelcastInstance): List { - return cpGroup(hazelcastInstance).members().toList() - } - - /** Get [hazelcastInstance] CP Group[groupName] members*/ - fun cpGroup( - hazelcastInstance: HazelcastInstance, - groupName: String? = CPGroup.METADATA_CP_GROUP_NAME - ): CPGroup { - return cpSubsystemManagementService(hazelcastInstance).getCPGroup(groupName) - .toCompletableFuture().get() - } - - /** Get [hazelcastInstance] CP member UUIDs*/ - fun clusterCPMemberUUIDs(hazelcastInstance: HazelcastInstance): List { - return clusterCPMembers(hazelcastInstance).map { it.uuid } - } - - /** Get [hazelcastInstance] CP members*/ - fun clusterCPMembers(hazelcastInstance: HazelcastInstance): List { - return cpSubsystemManagementService(hazelcastInstance).cpMembers.toCompletableFuture().get().toList() - } - - /** Get CPSubsystemManagementService for [hazelcastInstance] */ - fun cpSubsystemManagementService(hazelcastInstance: HazelcastInstance): CPSubsystemManagementService { - val cpSubsystemManagementService = hazelcastInstance.cpSubsystem.cpSubsystemManagementService - cpSubsystemManagementService.awaitUntilDiscoveryCompleted(3, TimeUnit.MINUTES) - return cpSubsystemManagementService - } - - /** Get local CPMemberUUID for [hazelcastInstance] */ - fun localCPMemberUUID(hazelcastInstance: HazelcastInstance) = localCPMember(hazelcastInstance)?.uuid - - /** Check local member is CP member for [hazelcastInstance] */ - fun checkLocalMemberIsCPMember(hazelcastInstance: HazelcastInstance): Boolean { - return localCPMember(hazelcastInstance) != null - } - - /** Get local CP member for [hazelcastInstance] */ - fun localCPMember(hazelcastInstance: HazelcastInstance) = - cpSubsystemManagementService(hazelcastInstance).localCPMember - - /** Get local CP member UUID for [hazelcastInstance] */ - fun localMemberUUID(hazelcastInstance: HazelcastInstance) = hazelcastInstance.cluster.localMember.uuid -} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 9725553a5..2d957c289 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -78,6 +78,7 @@ interface ClusterLock { suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean + fun isLockedByCurrentThread(): Boolean fun close() } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt new file mode 100644 index 000000000..e214b6593 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -0,0 +1,236 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.fasterxml.jackson.databind.JsonNode +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.instance.impl.HazelcastInstanceFactory +import com.hazelcast.map.IMap +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import java.io.Serializable +import java.util.Properties +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class HazelcastClusterServiceTest { + private val log = logger(HazelcastClusterServiceTest::class) + private val clusterSize = 3 + + @Before + @After + fun killAllHazelcastInstances() { + HazelcastInstanceFactory.terminateAll() + } + + @Test + fun testClientFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + System.setProperty( + "hazelcast.client.config", + normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath + ) + val config = YamlClientConfigBuilder().build() + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testServerFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") + val config = FileSystemYamlConfig(configFile) + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testClusterJoin() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(1, 2, 3)).toMutableList() + printReachableMembers(bluePrintClusterServiceOne) + testDistributedStore(bluePrintClusterServiceOne) + testDistributedLock(bluePrintClusterServiceOne) + } + } + + private suspend fun createCluster( + ids: List, + joinAsClient: Boolean? = false + ): List { + + return withContext(Dispatchers.Default) { + val deferred = ids.map { id -> + async(Dispatchers.IO) { + val nodeId = "node-$id" + log.info("********** Starting ($nodeId)") + val properties = Properties() + properties["hazelcast.logging.type"] = "slf4j" + val clusterInfo = + if (joinAsClient!!) { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = true, + configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", + properties = properties + ) + } else { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = false, + configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml", + properties = properties + ) + } + val hazelcastClusterService = HazelcastClusterService() + hazelcastClusterService.startCluster(clusterInfo) + hazelcastClusterService + } + } + deferred.awaitAll() + } + } + + private suspend fun testDistributedStore(bluePrintClusterServices: List) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = bluePrintClusterServices[0].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + assertNotNull(store, "failed to get store") + repeat(5) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + + val store1 = bluePrintClusterServices[1].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + + store1.values.map { + log.trace("Received map event : $it") + } + delay(5) + store.clear() + } + } + + private suspend fun testDistributedLock(bluePrintClusterServices: List) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + newSingleThreadContext("first").use { + withContext(it) { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + } + } + val deferred2 = async { + newSingleThreadContext("second").use { + withContext(it) { + executeLock(bluePrintClusterServices[1], "second", lockName) + } + } + } + val deferred3 = async { + newSingleThreadContext("third").use { + withContext(it) { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + } + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock( + bluePrintClusterService: BluePrintClusterService, + lockId: String, + lockName: String + ) { + log.info("initialising $lockId lock...") + val distributedLock = bluePrintClusterService.clusterLock(lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked(), "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unLock() + log.info("$lockId lock released") + } + distributedLock.close() + } + + private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { + log.info("initialising ...") + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + + val memberNameMap = bluePrintClusterService.clusterMapStore("member-name-map") as IMap + assertEquals(3, memberNameMap.size, "failed to match member size") + memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } + val scheduler = hazelcastClusterService.clusterScheduler("cleanup") + // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) + // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) + // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) + // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) + } + + private suspend fun printReachableMembers(bluePrintClusterServices: List) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + val hazelcast = hazelcastClusterService.hazelcast + val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null + val master = hazelcastClusterService.masterMember("system").memberAddress + val members = hazelcastClusterService.allMembers().map { it.memberAddress } + log.info("Cluster Members for($self): master($master) Members($members)") + } + + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-") + assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") + log.info("Cluster applicationMembers ($applicationMembers)") + } +} + +open class SampleSchedulerTask : Runnable, Serializable { + private val log = logger(SampleSchedulerTask::class) + override fun run() { + log.info("I am scheduler action") + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt deleted file mode 100644 index 80cf41558..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster - -import com.fasterxml.jackson.databind.JsonNode -import com.hazelcast.client.config.YamlClientConfigBuilder -import com.hazelcast.cluster.Member -import com.hazelcast.config.FileSystemYamlConfig -import com.hazelcast.map.IMap -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile -import java.io.Serializable -import java.time.Duration -import java.util.Properties -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class HazlecastClusterServiceTest { - private val log = logger(HazlecastClusterServiceTest::class) - private val clusterSize = 3 - - @Test - fun testClientFileSystemYamlConfig() { - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") - System.setProperty( - "hazelcast.client.config", - normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath - ) - val config = YamlClientConfigBuilder().build() - assertNotNull(config) - assertEquals("test-cluster", config.clusterName) - assertEquals("node-1234", config.instanceName) - } - - @Test - fun testServerFileSystemYamlConfig() { - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") - System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") - val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") - val config = FileSystemYamlConfig(configFile) - assertNotNull(config) - assertEquals("test-cluster", config.clusterName) - assertEquals("node-1234", config.instanceName) - } - - @Test - fun testClusterJoin() { - runBlocking { - val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() - // delay(1000) - // Join as Hazlecast Management Node - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) - printReachableMembers(bluePrintClusterServiceOne) - testDistributedStore(bluePrintClusterServiceOne) - testDistributedLock(bluePrintClusterServiceOne) - - // executeScheduler(bluePrintClusterServiceOne[0]) - // delay(1000) - // Shutdown - shutdown(bluePrintClusterServiceOne) - } - } - - private suspend fun createCluster( - ports: List, - joinAsClient: Boolean? = false - ): List { - - return withContext(Dispatchers.Default) { - val deferred = ports.map { port -> - async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") - val properties = Properties() - properties["hazelcast.logging.type"] = "slf4j" - val clusterInfo = - if (joinAsClient!!) { - ClusterInfo( - id = "test-cluster", nodeId = nodeId, joinAsClient = true, - configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", - properties = properties - ) - } else { - ClusterInfo( - id = "test-cluster", nodeId = nodeId, joinAsClient = false, - configFile = "./src/test/resources/hazelcast/hazelcast-cluster.yaml", - properties = properties - ) - } - val hazlecastClusterService = HazlecastClusterService() - hazlecastClusterService.startCluster(clusterInfo) - hazlecastClusterService - } - } - deferred.awaitAll() - } - } - - private suspend fun shutdown(bluePrintClusterServices: List) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - bluePrintClusterService.shutDown(Duration.ofMillis(10)) - } - } - - private suspend fun testDistributedStore(bluePrintClusterServices: List) { - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = bluePrintClusterServices[0].clusterMapStore( - "blueprint-runtime-$storeId" - ) as IMap - assertNotNull(store, "failed to get store") - repeat(5) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - - val store1 = bluePrintClusterServices[1].clusterMapStore( - "blueprint-runtime-$storeId" - ) as IMap - - store1.values.map { - log.trace("Received map event : $it") - } - delay(5) - store.clear() - } - } - - private suspend fun testDistributedLock(bluePrintClusterServices: List) { - val lockName = "sample-lock" - withContext(Dispatchers.IO) { - val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) - } - val deferred2 = async { - executeLock(bluePrintClusterServices[1], "second", lockName) - } - val deferred3 = async { - executeLock(bluePrintClusterServices[2], "third", lockName) - } - deferred.start() - deferred2.start() - deferred3.start() - } - } - - private suspend fun executeLock( - bluePrintClusterService: BluePrintClusterService, - lockId: String, - lockName: String - ) { - log.info("initialising $lockId lock...") - val distributedLock = bluePrintClusterService.clusterLock(lockName) - assertNotNull(distributedLock, "failed to create distributed $lockId lock") - distributedLock.lock() - assertTrue(distributedLock.isLocked(), "failed to lock $lockId") - try { - log.info("locked $lockId process for 5mSec") - delay(5) - } finally { - distributedLock.unLock() - log.info("$lockId lock released") - } - distributedLock.close() - } - - private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { - log.info("initialising ...") - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - - val memberNameMap = bluePrintClusterService.clusterMapStore("member-name-map") as IMap - assertEquals(3, memberNameMap.size, "failed to match member size") - memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } - val scheduler = hazlecastClusterService.clusterScheduler("cleanup") - // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) - // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) - // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) - // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) - } - - private suspend fun printReachableMembers(bluePrintClusterServices: List) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - val hazelcast = hazlecastClusterService.hazelcast - val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null - val master = hazlecastClusterService.masterMember("system").memberAddress - val members = hazlecastClusterService.allMembers().map { it.memberAddress } - log.info("Cluster Members for($self): master($master) Members($members)") - } - - val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") - assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") - log.info("Cluster applicationMembers ($applicationMembers)") - } -} - -open class SampleSchedulerTask : Runnable, Serializable { - private val log = logger(SampleSchedulerTask::class) - override fun run() { - log.info("I am scheduler action") - } -} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml index de6047a90..b4dc3454a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml @@ -7,10 +7,15 @@ hazelcast: session-time-to-live-seconds: 60 session-heartbeat-interval-seconds: 5 missing-cp-member-auto-removal-seconds: 120 + metrics: + enabled: false network: join: - multicast: + tcp-ip: enabled: true + interface: 127.0.0.1 + multicast: + enabled: false # Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue # on macOs with docker installed and multicast address different than 224.0.0.1 # https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac -- cgit 1.2.3-korg