summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/processor-core/src/main
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-02-05 15:51:03 -0500
committerBrinda Santh <bs2796@att.com>2020-02-12 14:16:28 -0500
commit65bb9d0d83762e8fa8e3ab568c801908eafa0686 (patch)
treead8ae7fafc1954b44ddd9b72d1fceb482f042431 /ms/blueprintsprocessor/modules/commons/processor-core/src/main
parent723cb0b0f4fca052561f21bb8312bf7c6e8cd524 (diff)
Cluster co-ordination with Hazelcast.
Remove Atomix implementation, due to Kubernetes clustering issues. Cluster environment property changes. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I23f40c92c0adc6b3ab8690871385f78525c76433
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/processor-core/src/main')
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt46
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt252
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt32
3 files changed, 322 insertions, 8 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
new file mode 100644
index 000000000..85d9d5c27
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import com.hazelcast.cluster.Member
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+/**
+ * Exposed Dependency Service by this Hazlecast Lib Module
+ */
+fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
+ instance(HazlecastClusterService::class)
+
+/** Optional Cluster Service, returns only if Cluster is enabled */
+fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
+ return if (BluePrintConstants.CLUSTER_ENABLED) {
+ BluePrintDependencyService.clusterService()
+ } else null
+}
+
+/** Extension to convert Hazelcast Member to Blueprints Cluster Member */
+fun Member.toClusterMember(): ClusterMember {
+ val memberName: String = this.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: this.uuid.toString()
+ return ClusterMember(
+ id = this.uuid.toString(),
+ name = memberName,
+ memberAddress = this.address.toString()
+ )
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
new file mode 100644
index 000000000..83a04d653
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
@@ -0,0 +1,252 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import com.hazelcast.client.HazelcastClient
+import com.hazelcast.client.config.ClientConfig
+import com.hazelcast.client.config.YamlClientConfigBuilder
+import com.hazelcast.cluster.Member
+import com.hazelcast.cluster.MembershipEvent
+import com.hazelcast.cluster.MembershipListener
+import com.hazelcast.config.Config
+import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.config.MemberAttributeConfig
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.core.HazelcastInstance
+import com.hazelcast.cp.lock.FencedLock
+import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.stereotype.Service
+import java.time.Duration
+import java.util.concurrent.TimeUnit
+
+@Service
+open class HazlecastClusterService : BluePrintClusterService {
+
+ private val log = logger(HazlecastClusterService::class)
+ lateinit var hazelcast: HazelcastInstance
+ var joinedClient = false
+ var joinedLite = false
+
+ override suspend fun <T> startCluster(configuration: T) {
+ /** Get the Hazelcast Cliet or Server instance */
+ hazelcast =
+ when (configuration) {
+ is Config -> {
+ joinedLite = configuration.isLiteMember
+ Hazelcast.newHazelcastInstance(configuration)
+ }
+ is ClientConfig -> {
+ joinedClient = true
+ HazelcastClient.newHazelcastClient(configuration)
+ }
+ is ClusterInfo -> {
+
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
+
+ val memberAttributeConfig = MemberAttributeConfig()
+ memberAttributeConfig.setAttribute(
+ BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
+ configuration.nodeId
+ )
+
+ val configFile = configuration.configFile
+ /** Check file exists */
+ val clusterConfigFile = normalizedFile(configuration.configFile)
+ check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
+ "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
+ }
+ check(clusterConfigFile.exists()) {
+ "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
+ }
+ log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
+
+ /** Hazelcast Client from config file */
+ if (configuration.joinAsClient) {
+ /** Set the configuration file to system properties, so that Hazelcast will read automatically */
+ System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
+ joinedClient = true
+ val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
+ hazelcastClientConfiguration.properties = configuration.properties
+ HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
+ } else {
+ /** Hazelcast Server from config file */
+ val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
+ hazelcastServerConfiguration.properties = configuration.properties
+ hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
+ joinedLite = hazelcastServerConfiguration.isLiteMember
+ Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
+ }
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't understand the cluster configuration")
+ }
+ }
+
+ /** Add the Membership Listeners */
+ hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
+ log.info(
+ "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
+ )
+ }
+
+ override fun isClient(): Boolean {
+ return joinedClient
+ }
+
+ override fun isLiteMember(): Boolean {
+ return joinedLite
+ }
+
+ override fun clusterJoined(): Boolean {
+ return hazelcast.lifecycleService.isRunning
+ }
+
+ override suspend fun masterMember(partitionGroup: String): ClusterMember {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.cluster.members.first().toClusterMember()
+ }
+
+ override suspend fun allMembers(): Set<ClusterMember> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
+ }
+
+ override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
+ }
+
+ override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.getMap<String, T>(name)
+ }
+
+ /**
+ * The DistributedLock is a distributed implementation of Java’s Lock.
+ * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+ * determine ordering of multiple concurrent lock holders.
+ * DistributedLocks are designed to account for failures within the cluster.
+ * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+ * the lock will be released and granted to the next waiting process.
+ */
+ override suspend fun clusterLock(name: String): ClusterLock {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return ClusterLockImpl(hazelcast, name)
+ }
+
+ /** Return interface may change and it will be included in BluePrintClusterService */
+ @UseExperimental
+ suspend fun clusterScheduler(name: String): IScheduledExecutorService {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.getScheduledExecutorService(name)
+ }
+
+ override suspend fun shutDown(duration: Duration) {
+ if (::hazelcast.isInitialized && clusterJoined()) {
+ delay(duration.toMillis())
+ hazelcast.lifecycleService.terminate()
+ }
+ }
+
+ /** Utils */
+ suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ check(!isClient()) { "not supported for cluster client members." }
+ return hazelcastApplicationMembers(ClusterUtils.applicationName())
+ }
+
+ suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ val applicationMembers: MutableMap<String, Member> = hashMapOf()
+ hazelcast.cluster.members.map { member ->
+ val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
+ if (memberName.startsWith(appName, true)) {
+ applicationMembers[memberName] = member
+ }
+ }
+ return applicationMembers
+ }
+}
+
+open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
+ MembershipListener {
+ private val log = logger(BlueprintsClusterMembershipListener::class)
+
+ override fun memberRemoved(membershipEvent: MembershipEvent) {
+ log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent")
+ }
+
+ override fun memberAdded(membershipEvent: MembershipEvent) {
+ log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent")
+ }
+}
+
+open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
+ private val log = logger(ClusterLockImpl::class)
+
+ lateinit var distributedLock: FencedLock
+
+ override fun name(): String {
+ return distributedLock.name
+ }
+
+ override suspend fun lock() {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ distributedLock.lock()
+ log.trace("Cluster lock($name) created..")
+ }
+
+ override suspend fun tryLock(timeout: Long): Boolean {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
+ }
+
+ override suspend fun unLock() {
+ distributedLock.unlock()
+ log.trace("Cluster unlock(${name()}) successfully..")
+ }
+
+ override fun isLocked(): Boolean {
+ return distributedLock.isLocked
+ }
+
+ override suspend fun fenceLock(): String {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ val fence = distributedLock.lockAndGetFence()
+ log.trace("Cluster lock($name) fence($fence) created..")
+ return fence.toString()
+ }
+
+ override suspend fun tryFenceLock(timeout: Long): String {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
+ }
+
+ override fun close() {
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index f994628a2..53f18d38a 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -17,23 +17,33 @@
package org.onap.ccsdk.cds.blueprintsprocessor.core.service
import java.time.Duration
+import java.util.Properties
interface BluePrintClusterService {
/** Start the cluster with [clusterInfo], By default clustering service is disabled.
* Application module has to start cluster */
- suspend fun startCluster(clusterInfo: ClusterInfo)
+ suspend fun <T> startCluster(configuration: T)
fun clusterJoined(): Boolean
+ fun isClient(): Boolean
+
+ fun isLiteMember(): Boolean
+
/** Returns [partitionGroup] master member */
suspend fun masterMember(partitionGroup: String): ClusterMember
/** Returns all the data cluster members */
suspend fun allMembers(): Set<ClusterMember>
- /** Returns data cluster members starting with prefix */
- suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember>
+ /**
+ * Returns application cluster members for [appName] joined as server or lite member,
+ * Node joined as client won't be visible. Here the assumption is node-id is combination of
+ * application id and replica number, for an example Application cds-cluster then the node ids will be
+ * cds-cluster-1, cds-cluster-2, cds-cluster-3
+ */
+ suspend fun applicationMembers(appName: String): Set<ClusterMember>
/** Create and get or get the distributed data map store with [name] */
suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
@@ -47,19 +57,25 @@ interface BluePrintClusterService {
data class ClusterInfo(
val id: String,
- var configFile: String? = null,
val nodeId: String,
- val nodeAddress: String,
- var clusterMembers: List<String>,
- var storagePath: String
+ var joinAsClient: Boolean = false,
+ var properties: Properties?,
+ var configFile: String
)
-data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+data class ClusterMember(
+ val id: String,
+ val name: String,
+ val memberAddress: String?,
+ val state: String? = null
+)
interface ClusterLock {
fun name(): String
suspend fun lock()
+ suspend fun fenceLock(): String
suspend fun tryLock(timeout: Long): Boolean
+ suspend fun tryFenceLock(timeout: Long): String
suspend fun unLock()
fun isLocked(): Boolean
fun close()