diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/processor-core/src/main')
3 files changed, 322 insertions, 8 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt new file mode 100644 index 000000000..85d9d5c27 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -0,0 +1,46 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.cluster.Member +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** + * Exposed Dependency Service by this Hazlecast Lib Module + */ +fun BluePrintDependencyService.clusterService(): BluePrintClusterService = + instance(HazlecastClusterService::class) + +/** Optional Cluster Service, returns only if Cluster is enabled */ +fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { + return if (BluePrintConstants.CLUSTER_ENABLED) { + BluePrintDependencyService.clusterService() + } else null +} + +/** Extension to convert Hazelcast Member to Blueprints Cluster Member */ +fun Member.toClusterMember(): ClusterMember { + val memberName: String = this.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: this.uuid.toString() + return ClusterMember( + id = this.uuid.toString(), + name = memberName, + memberAddress = this.address.toString() + ) +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt new file mode 100644 index 000000000..83a04d653 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -0,0 +1,252 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.ClientConfig +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.cluster.MembershipEvent +import com.hazelcast.cluster.MembershipListener +import com.hazelcast.config.Config +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.config.MemberAttributeConfig +import com.hazelcast.core.Hazelcast +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.lock.FencedLock +import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.TimeUnit + +@Service +open class HazlecastClusterService : BluePrintClusterService { + + private val log = logger(HazlecastClusterService::class) + lateinit var hazelcast: HazelcastInstance + var joinedClient = false + var joinedLite = false + + override suspend fun <T> startCluster(configuration: T) { + /** Get the Hazelcast Cliet or Server instance */ + hazelcast = + when (configuration) { + is Config -> { + joinedLite = configuration.isLiteMember + Hazelcast.newHazelcastInstance(configuration) + } + is ClientConfig -> { + joinedClient = true + HazelcastClient.newHazelcastClient(configuration) + } + is ClusterInfo -> { + + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id) + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId) + + val memberAttributeConfig = MemberAttributeConfig() + memberAttributeConfig.setAttribute( + BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, + configuration.nodeId + ) + + val configFile = configuration.configFile + /** Check file exists */ + val clusterConfigFile = normalizedFile(configuration.configFile) + check(clusterConfigFile.absolutePath.endsWith("yaml", true)) { + "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml" + } + check(clusterConfigFile.exists()) { + "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})" + } + log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****") + + /** Hazelcast Client from config file */ + if (configuration.joinAsClient) { + /** Set the configuration file to system properties, so that Hazelcast will read automatically */ + System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath) + joinedClient = true + val hazelcastClientConfiguration = YamlClientConfigBuilder().build() + hazelcastClientConfiguration.properties = configuration.properties + HazelcastClient.newHazelcastClient(hazelcastClientConfiguration) + } else { + /** Hazelcast Server from config file */ + val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile)) + hazelcastServerConfiguration.properties = configuration.properties + hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig + joinedLite = hazelcastServerConfiguration.isLiteMember + Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + } + } + else -> { + throw BluePrintProcessorException("couldn't understand the cluster configuration") + } + } + + /** Add the Membership Listeners */ + hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this)) + log.info( + "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." + ) + } + + override fun isClient(): Boolean { + return joinedClient + } + + override fun isLiteMember(): Boolean { + return joinedLite + } + + override fun clusterJoined(): Boolean { + return hazelcast.lifecycleService.isRunning + } + + override suspend fun masterMember(partitionGroup: String): ClusterMember { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.first().toClusterMember() + } + + override suspend fun allMembers(): Set<ClusterMember> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.map { it.toClusterMember() }.toSet() + } + + override suspend fun applicationMembers(appName: String): Set<ClusterMember> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet() + } + + override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getMap<String, T>(name) + } + + /** + * The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(hazelcast, name) + } + + /** Return interface may change and it will be included in BluePrintClusterService */ + @UseExperimental + suspend fun clusterScheduler(name: String): IScheduledExecutorService { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getScheduledExecutorService(name) + } + + override suspend fun shutDown(duration: Duration) { + if (::hazelcast.isInitialized && clusterJoined()) { + delay(duration.toMillis()) + hazelcast.lifecycleService.terminate() + } + } + + /** Utils */ + suspend fun myHazelcastApplicationMembers(): Map<String, Member> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + check(!isClient()) { "not supported for cluster client members." } + return hazelcastApplicationMembers(ClusterUtils.applicationName()) + } + + suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + val applicationMembers: MutableMap<String, Member> = hashMapOf() + hazelcast.cluster.members.map { member -> + val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) + if (memberName.startsWith(appName, true)) { + applicationMembers[memberName] = member + } + } + return applicationMembers + } +} + +open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) : + MembershipListener { + private val log = logger(BlueprintsClusterMembershipListener::class) + + override fun memberRemoved(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent") + } + + override fun memberAdded(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent") + } +} + +open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { + private val log = logger(ClusterLockImpl::class) + + lateinit var distributedLock: FencedLock + + override fun name(): String { + return distributedLock.name + } + + override suspend fun lock() { + distributedLock = hazelcast.cpSubsystem.getLock(name) + distributedLock.lock() + log.trace("Cluster lock($name) created..") + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + } + + override suspend fun unLock() { + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked + } + + override suspend fun fenceLock(): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + val fence = distributedLock.lockAndGetFence() + log.trace("Cluster lock($name) fence($fence) created..") + return fence.toString() + } + + override suspend fun tryFenceLock(timeout: Long): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() + } + + override fun close() { + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index f994628a2..53f18d38a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -17,23 +17,33 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service import java.time.Duration +import java.util.Properties interface BluePrintClusterService { /** Start the cluster with [clusterInfo], By default clustering service is disabled. * Application module has to start cluster */ - suspend fun startCluster(clusterInfo: ClusterInfo) + suspend fun <T> startCluster(configuration: T) fun clusterJoined(): Boolean + fun isClient(): Boolean + + fun isLiteMember(): Boolean + /** Returns [partitionGroup] master member */ suspend fun masterMember(partitionGroup: String): ClusterMember /** Returns all the data cluster members */ suspend fun allMembers(): Set<ClusterMember> - /** Returns data cluster members starting with prefix */ - suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> + /** + * Returns application cluster members for [appName] joined as server or lite member, + * Node joined as client won't be visible. Here the assumption is node-id is combination of + * application id and replica number, for an example Application cds-cluster then the node ids will be + * cds-cluster-1, cds-cluster-2, cds-cluster-3 + */ + suspend fun applicationMembers(appName: String): Set<ClusterMember> /** Create and get or get the distributed data map store with [name] */ suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> @@ -47,19 +57,25 @@ interface BluePrintClusterService { data class ClusterInfo( val id: String, - var configFile: String? = null, val nodeId: String, - val nodeAddress: String, - var clusterMembers: List<String>, - var storagePath: String + var joinAsClient: Boolean = false, + var properties: Properties?, + var configFile: String ) -data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) +data class ClusterMember( + val id: String, + val name: String, + val memberAddress: String?, + val state: String? = null +) interface ClusterLock { fun name(): String suspend fun lock() + suspend fun fenceLock(): String suspend fun tryLock(timeout: Long): Boolean + suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean fun close() |