diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
6 files changed, 63 insertions, 51 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 81fc0d709..0a58857f7 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -29,10 +29,10 @@ import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService /** - * Exposed Dependency Service by this Hazlecast Lib Module + * Exposed Dependency Service by this Hazelcast Lib Module */ fun BluePrintDependencyService.clusterService(): BluePrintClusterService = - instance(HazlecastClusterService::class) + instance(HazelcastClusterService::class) /** Optional Cluster Service, returns only if Cluster is enabled */ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt index feb2a8e2a..d3c88d732 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterService.kt @@ -45,9 +45,9 @@ import java.time.Duration import java.util.concurrent.TimeUnit @Service -open class HazlecastClusterService : BluePrintClusterService { +open class HazelcastClusterService : BluePrintClusterService { - private val log = logger(HazlecastClusterService::class) + private val log = logger(HazelcastClusterService::class) lateinit var hazelcast: HazelcastInstance lateinit var cpSubsystemManagementService: CPSubsystemManagementService var joinedClient = false @@ -179,14 +179,14 @@ open class HazlecastClusterService : BluePrintClusterService { override suspend fun shutDown(duration: Duration) { if (::hazelcast.isInitialized && clusterJoined()) { delay(duration.toMillis()) - HazlecastClusterUtils.terminate(hazelcast) + HazelcastClusterUtils.terminate(hazelcast) } } /** Utils */ suspend fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { if (!joinedClient && !joinedLite) { - HazlecastClusterUtils.promoteAsCPMember(hazelcastInstance) + HazelcastClusterUtils.promoteAsCPMember(hazelcastInstance) } } @@ -243,17 +243,18 @@ open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val } override suspend fun unLock() { - // Added condition to avoid failures like - "Current thread is not owner of the lock!" - if (distributedLock.isLockedByCurrentThread) { - distributedLock.unlock() - log.trace("Cluster unlock(${name()}) successfully..") - } + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") } override fun isLocked(): Boolean { return distributedLock.isLocked } + override fun isLockedByCurrentThread(): Boolean { + return distributedLock.isLockedByCurrentThread + } + override suspend fun fenceLock(): String { val fence = distributedLock.lockAndGetFence() log.trace("Cluster lock($name) fence($fence) created..") diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt index 70970f6da..e5f488a0e 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterUtils.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterUtils.kt @@ -25,9 +25,9 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import java.util.UUID import java.util.concurrent.TimeUnit -object HazlecastClusterUtils { +object HazelcastClusterUtils { - private val log = logger(HazlecastClusterUtils::class) + private val log = logger(HazelcastClusterUtils::class) /** Promote [hazelcastInstance] member to CP Member */ fun promoteAsCPMember(hazelcastInstance: HazelcastInstance) { diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index 9725553a5..2d957c289 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -78,6 +78,7 @@ interface ClusterLock { suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean + fun isLockedByCurrentThread(): Boolean fun close() } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt index 80cf41558..e214b6593 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazelcastClusterServiceTest.kt @@ -20,13 +20,17 @@ import com.fasterxml.jackson.databind.JsonNode import com.hazelcast.client.config.YamlClientConfigBuilder import com.hazelcast.cluster.Member import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.instance.impl.HazelcastInstanceFactory import com.hazelcast.map.IMap import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext +import org.junit.After +import org.junit.Before import org.junit.Test import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo @@ -35,16 +39,21 @@ import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile import java.io.Serializable -import java.time.Duration import java.util.Properties import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue -class HazlecastClusterServiceTest { - private val log = logger(HazlecastClusterServiceTest::class) +class HazelcastClusterServiceTest { + private val log = logger(HazelcastClusterServiceTest::class) private val clusterSize = 3 + @Before + @After + fun killAllHazelcastInstances() { + HazelcastInstanceFactory.terminateAll() + } + @Test fun testClientFileSystemYamlConfig() { System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") @@ -74,33 +83,23 @@ class HazlecastClusterServiceTest { fun testClusterJoin() { runBlocking { val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() - // delay(1000) - // Join as Hazlecast Management Node - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + createCluster(arrayListOf(1, 2, 3)).toMutableList() printReachableMembers(bluePrintClusterServiceOne) testDistributedStore(bluePrintClusterServiceOne) testDistributedLock(bluePrintClusterServiceOne) - - // executeScheduler(bluePrintClusterServiceOne[0]) - // delay(1000) - // Shutdown - shutdown(bluePrintClusterServiceOne) } } private suspend fun createCluster( - ports: List<Int>, + ids: List<Int>, joinAsClient: Boolean? = false ): List<BluePrintClusterService> { return withContext(Dispatchers.Default) { - val deferred = ports.map { port -> + val deferred = ids.map { id -> async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") + val nodeId = "node-$id" + log.info("********** Starting ($nodeId)") val properties = Properties() properties["hazelcast.logging.type"] = "slf4j" val clusterInfo = @@ -117,21 +116,15 @@ class HazlecastClusterServiceTest { properties = properties ) } - val hazlecastClusterService = HazlecastClusterService() - hazlecastClusterService.startCluster(clusterInfo) - hazlecastClusterService + val hazelcastClusterService = HazelcastClusterService() + hazelcastClusterService.startCluster(clusterInfo) + hazelcastClusterService } } deferred.awaitAll() } } - private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) { - bluePrintClusterServices.forEach { bluePrintClusterService -> - bluePrintClusterService.shutDown(Duration.ofMillis(10)) - } - } - private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { /** Test Distributed store creation */ repeat(2) { storeId -> @@ -159,13 +152,25 @@ class HazlecastClusterServiceTest { val lockName = "sample-lock" withContext(Dispatchers.IO) { val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) + newSingleThreadContext("first").use { + withContext(it) { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + } } val deferred2 = async { - executeLock(bluePrintClusterServices[1], "second", lockName) + newSingleThreadContext("second").use { + withContext(it) { + executeLock(bluePrintClusterServices[1], "second", lockName) + } + } } val deferred3 = async { - executeLock(bluePrintClusterServices[2], "third", lockName) + newSingleThreadContext("third").use { + withContext(it) { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + } } deferred.start() deferred2.start() @@ -195,12 +200,12 @@ class HazlecastClusterServiceTest { private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { log.info("initialising ...") - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap assertEquals(3, memberNameMap.size, "failed to match member size") memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } - val scheduler = hazlecastClusterService.clusterScheduler("cleanup") + val scheduler = hazelcastClusterService.clusterScheduler("cleanup") // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) @@ -209,15 +214,15 @@ class HazlecastClusterServiceTest { private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) { bluePrintClusterServices.forEach { bluePrintClusterService -> - val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService - val hazelcast = hazlecastClusterService.hazelcast + val hazelcastClusterService = bluePrintClusterService as HazelcastClusterService + val hazelcast = hazelcastClusterService.hazelcast val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null - val master = hazlecastClusterService.masterMember("system").memberAddress - val members = hazlecastClusterService.allMembers().map { it.memberAddress } + val master = hazelcastClusterService.masterMember("system").memberAddress + val members = hazelcastClusterService.allMembers().map { it.memberAddress } log.info("Cluster Members for($self): master($master) Members($members)") } - val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-") assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") log.info("Cluster applicationMembers ($applicationMembers)") } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml index de6047a90..b4dc3454a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-cluster.yaml @@ -7,10 +7,15 @@ hazelcast: session-time-to-live-seconds: 60 session-heartbeat-interval-seconds: 5 missing-cp-member-auto-removal-seconds: 120 + metrics: + enabled: false network: join: - multicast: + tcp-ip: enabled: true + interface: 127.0.0.1 + multicast: + enabled: false # Specify 224.0.0.1 instead of default 224.2.2.3 since there's some issue # on macOs with docker installed and multicast address different than 224.0.0.1 # https://stackoverflow.com/questions/46341715/hazelcast-multicast-does-not-work-because-of-vboxnet-which-is-used-by-docker-mac |