From 65bb9d0d83762e8fa8e3ab568c801908eafa0686 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Wed, 5 Feb 2020 15:51:03 -0500 Subject: Cluster co-ordination with Hazelcast. Remove Atomix implementation, due to Kubernetes clustering issues. Cluster environment property changes. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh Change-Id: I23f40c92c0adc6b3ab8690871385f78525c76433 --- .../modules/commons/processor-core/pom.xml | 4 + .../core/cluster/BluePrintClusterExtensions.kt | 46 ++++ .../core/cluster/HazlecastClusterService.kt | 252 +++++++++++++++++++++ .../core/service/BluePrintClusterService.kt | 32 ++- .../core/cluster/HazlecastClusterServiceTest.kt | 231 +++++++++++++++++++ .../test/resources/hazelcast/hazelcast-5679.yaml | 18 ++ .../test/resources/hazelcast/hazelcast-5680.yaml | 18 ++ .../test/resources/hazelcast/hazelcast-5681.yaml | 18 ++ .../test/resources/hazelcast/hazelcast-5682.yaml | 18 ++ .../test/resources/hazelcast/hazelcast-client.yaml | 13 ++ .../src/test/resources/hazelcast/hazelcast.yaml | 18 ++ .../src/test/resources/logback-test.xml | 36 +++ 12 files changed, 696 insertions(+), 8 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml create mode 100644 ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml (limited to 'ms/blueprintsprocessor/modules/commons/processor-core') diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml index 2f5ae6624..d08c16781 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml @@ -32,6 +32,10 @@ Blueprints Processor Core + + com.hazelcast + hazelcast-all + org.onap.ccsdk.cds.blueprintsprocessor blueprint-proto diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt new file mode 100644 index 000000000..85d9d5c27 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -0,0 +1,46 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.cluster.Member +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService + +/** + * Exposed Dependency Service by this Hazlecast Lib Module + */ +fun BluePrintDependencyService.clusterService(): BluePrintClusterService = + instance(HazlecastClusterService::class) + +/** Optional Cluster Service, returns only if Cluster is enabled */ +fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { + return if (BluePrintConstants.CLUSTER_ENABLED) { + BluePrintDependencyService.clusterService() + } else null +} + +/** Extension to convert Hazelcast Member to Blueprints Cluster Member */ +fun Member.toClusterMember(): ClusterMember { + val memberName: String = this.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: this.uuid.toString() + return ClusterMember( + id = this.uuid.toString(), + name = memberName, + memberAddress = this.address.toString() + ) +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt new file mode 100644 index 000000000..83a04d653 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -0,0 +1,252 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.ClientConfig +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.cluster.MembershipEvent +import com.hazelcast.cluster.MembershipListener +import com.hazelcast.config.Config +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.config.MemberAttributeConfig +import com.hazelcast.core.Hazelcast +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.lock.FencedLock +import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.TimeUnit + +@Service +open class HazlecastClusterService : BluePrintClusterService { + + private val log = logger(HazlecastClusterService::class) + lateinit var hazelcast: HazelcastInstance + var joinedClient = false + var joinedLite = false + + override suspend fun startCluster(configuration: T) { + /** Get the Hazelcast Cliet or Server instance */ + hazelcast = + when (configuration) { + is Config -> { + joinedLite = configuration.isLiteMember + Hazelcast.newHazelcastInstance(configuration) + } + is ClientConfig -> { + joinedClient = true + HazelcastClient.newHazelcastClient(configuration) + } + is ClusterInfo -> { + + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id) + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId) + + val memberAttributeConfig = MemberAttributeConfig() + memberAttributeConfig.setAttribute( + BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, + configuration.nodeId + ) + + val configFile = configuration.configFile + /** Check file exists */ + val clusterConfigFile = normalizedFile(configuration.configFile) + check(clusterConfigFile.absolutePath.endsWith("yaml", true)) { + "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml" + } + check(clusterConfigFile.exists()) { + "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})" + } + log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****") + + /** Hazelcast Client from config file */ + if (configuration.joinAsClient) { + /** Set the configuration file to system properties, so that Hazelcast will read automatically */ + System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath) + joinedClient = true + val hazelcastClientConfiguration = YamlClientConfigBuilder().build() + hazelcastClientConfiguration.properties = configuration.properties + HazelcastClient.newHazelcastClient(hazelcastClientConfiguration) + } else { + /** Hazelcast Server from config file */ + val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile)) + hazelcastServerConfiguration.properties = configuration.properties + hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig + joinedLite = hazelcastServerConfiguration.isLiteMember + Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + } + } + else -> { + throw BluePrintProcessorException("couldn't understand the cluster configuration") + } + } + + /** Add the Membership Listeners */ + hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this)) + log.info( + "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." + ) + } + + override fun isClient(): Boolean { + return joinedClient + } + + override fun isLiteMember(): Boolean { + return joinedLite + } + + override fun clusterJoined(): Boolean { + return hazelcast.lifecycleService.isRunning + } + + override suspend fun masterMember(partitionGroup: String): ClusterMember { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.first().toClusterMember() + } + + override suspend fun allMembers(): Set { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.map { it.toClusterMember() }.toSet() + } + + override suspend fun applicationMembers(appName: String): Set { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet() + } + + override suspend fun clusterMapStore(name: String): MutableMap { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getMap(name) + } + + /** + * The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(hazelcast, name) + } + + /** Return interface may change and it will be included in BluePrintClusterService */ + @UseExperimental + suspend fun clusterScheduler(name: String): IScheduledExecutorService { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getScheduledExecutorService(name) + } + + override suspend fun shutDown(duration: Duration) { + if (::hazelcast.isInitialized && clusterJoined()) { + delay(duration.toMillis()) + hazelcast.lifecycleService.terminate() + } + } + + /** Utils */ + suspend fun myHazelcastApplicationMembers(): Map { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + check(!isClient()) { "not supported for cluster client members." } + return hazelcastApplicationMembers(ClusterUtils.applicationName()) + } + + suspend fun hazelcastApplicationMembers(appName: String): Map { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + val applicationMembers: MutableMap = hashMapOf() + hazelcast.cluster.members.map { member -> + val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) + if (memberName.startsWith(appName, true)) { + applicationMembers[memberName] = member + } + } + return applicationMembers + } +} + +open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) : + MembershipListener { + private val log = logger(BlueprintsClusterMembershipListener::class) + + override fun memberRemoved(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent") + } + + override fun memberAdded(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent") + } +} + +open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { + private val log = logger(ClusterLockImpl::class) + + lateinit var distributedLock: FencedLock + + override fun name(): String { + return distributedLock.name + } + + override suspend fun lock() { + distributedLock = hazelcast.cpSubsystem.getLock(name) + distributedLock.lock() + log.trace("Cluster lock($name) created..") + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + } + + override suspend fun unLock() { + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked + } + + override suspend fun fenceLock(): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + val fence = distributedLock.lockAndGetFence() + log.trace("Cluster lock($name) fence($fence) created..") + return fence.toString() + } + + override suspend fun tryFenceLock(timeout: Long): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() + } + + override fun close() { + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index f994628a2..53f18d38a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -17,23 +17,33 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service import java.time.Duration +import java.util.Properties interface BluePrintClusterService { /** Start the cluster with [clusterInfo], By default clustering service is disabled. * Application module has to start cluster */ - suspend fun startCluster(clusterInfo: ClusterInfo) + suspend fun startCluster(configuration: T) fun clusterJoined(): Boolean + fun isClient(): Boolean + + fun isLiteMember(): Boolean + /** Returns [partitionGroup] master member */ suspend fun masterMember(partitionGroup: String): ClusterMember /** Returns all the data cluster members */ suspend fun allMembers(): Set - /** Returns data cluster members starting with prefix */ - suspend fun clusterMembersForPrefix(memberPrefix: String): Set + /** + * Returns application cluster members for [appName] joined as server or lite member, + * Node joined as client won't be visible. Here the assumption is node-id is combination of + * application id and replica number, for an example Application cds-cluster then the node ids will be + * cds-cluster-1, cds-cluster-2, cds-cluster-3 + */ + suspend fun applicationMembers(appName: String): Set /** Create and get or get the distributed data map store with [name] */ suspend fun clusterMapStore(name: String): MutableMap @@ -47,19 +57,25 @@ interface BluePrintClusterService { data class ClusterInfo( val id: String, - var configFile: String? = null, val nodeId: String, - val nodeAddress: String, - var clusterMembers: List, - var storagePath: String + var joinAsClient: Boolean = false, + var properties: Properties?, + var configFile: String ) -data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) +data class ClusterMember( + val id: String, + val name: String, + val memberAddress: String?, + val state: String? = null +) interface ClusterLock { fun name(): String suspend fun lock() + suspend fun fenceLock(): String suspend fun tryLock(timeout: Long): Boolean + suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean fun close() diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt new file mode 100644 index 000000000..b298eacae --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.fasterxml.jackson.databind.JsonNode +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.map.IMap +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import java.io.Serializable +import java.time.Duration +import java.util.Properties +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class HazlecastClusterServiceTest { + private val log = logger(HazlecastClusterServiceTest::class) + private val clusterSize = 3 + + @Test + fun testClientFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + System.setProperty( + "hazelcast.client.config", + normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath + ) + val config = YamlClientConfigBuilder().build() + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testServerFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") + val config = FileSystemYamlConfig(configFile) + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testClusterJoin() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() + // delay(1000) + // Join as Hazlecast Management Node + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) + // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + printReachableMembers(bluePrintClusterServiceOne) + testDistributedStore(bluePrintClusterServiceOne) + testDistributedLock(bluePrintClusterServiceOne) + + // executeScheduler(bluePrintClusterServiceOne[0]) + // delay(1000) + // Shutdown + shutdown(bluePrintClusterServiceOne) + } + } + + private suspend fun createCluster( + ports: List, + joinAsClient: Boolean? = false + ): List { + + return withContext(Dispatchers.Default) { + val deferred = ports.map { port -> + async(Dispatchers.IO) { + val nodeId = "node-$port" + log.info("********** Starting node($nodeId) on port($port)") + val properties = Properties() + properties["hazelcast.logging.type"] = "slf4j" + val clusterInfo = + if (joinAsClient!!) { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = true, + configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", + properties = properties + ) + } else { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = false, + configFile = "./src/test/resources/hazelcast/hazelcast-$port.yaml", + properties = properties + ) + } + val hazlecastClusterService = HazlecastClusterService() + hazlecastClusterService.startCluster(clusterInfo) + hazlecastClusterService + } + } + deferred.awaitAll() + } + } + + private suspend fun shutdown(bluePrintClusterServices: List) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + bluePrintClusterService.shutDown(Duration.ofMillis(10)) + } + } + + private suspend fun testDistributedStore(bluePrintClusterServices: List) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = bluePrintClusterServices[0].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + assertNotNull(store, "failed to get store") + repeat(5) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + + val store1 = bluePrintClusterServices[1].clusterMapStore( + "blueprint-runtime-$storeId" + ) as IMap + + store1.values.map { + log.trace("Received map event : $it") + } + delay(5) + store.clear() + } + } + + private suspend fun testDistributedLock(bluePrintClusterServices: List) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + val deferred2 = async { + executeLock(bluePrintClusterServices[0], "second", lockName) + } + val deferred3 = async { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock( + bluePrintClusterService: BluePrintClusterService, + lockId: String, + lockName: String + ) { + log.info("initialising $lockId lock...") + val distributedLock = bluePrintClusterService.clusterLock(lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked(), "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unLock() + log.info("$lockId lock released") + } + distributedLock.close() + } + + private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { + log.info("initialising ...") + val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + + val memberNameMap = bluePrintClusterService.clusterMapStore("member-name-map") as IMap + assertEquals(3, memberNameMap.size, "failed to match member size") + memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } + val scheduler = hazlecastClusterService.clusterScheduler("cleanup") + // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) + // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) + // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) + // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) + } + + private suspend fun printReachableMembers(bluePrintClusterServices: List) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + val hazelcast = hazlecastClusterService.hazelcast + val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null + val master = hazlecastClusterService.masterMember("system").memberAddress + val members = hazlecastClusterService.allMembers().map { it.memberAddress } + log.info("Cluster Members for($self): master($master) Members($members)") + } + + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") + assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") + log.info("Cluster applicationMembers ($applicationMembers)") + } +} + +open class SampleSchedulerTask : Runnable, Serializable { + private val log = logger(SampleSchedulerTask::class) + override fun run() { + log.info("I am scheduler action") + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml new file mode 100644 index 000000000..cbf488c95 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5679 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml new file mode 100644 index 000000000..356be1d05 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5680 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml new file mode 100644 index 000000000..d256f49e3 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5681 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml new file mode 100644 index 000000000..9c7d566db --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5682 + lite-member: + enabled: true + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml new file mode 100644 index 000000000..e60b5dfc4 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml @@ -0,0 +1,13 @@ +hazelcast-client: + cluster-name: ${CLUSTER_ID} + instance-name: ${CLUSTER_NODE_ID} + + network: + cluster-members: + - 127.0.0.1:5701 +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml new file mode 100644 index 000000000..dcecf454f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: ${CLUSTER_NODE_ID} + lite-member: + enabled: true + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml new file mode 100644 index 000000000..5275f4029 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml @@ -0,0 +1,36 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{100} - %msg%n + + + + + + + + + + + + + + -- cgit 1.2.3-korg