aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2020-02-05 15:51:03 -0500
committerBrinda Santh <bs2796@att.com>2020-02-12 14:16:28 -0500
commit65bb9d0d83762e8fa8e3ab568c801908eafa0686 (patch)
treead8ae7fafc1954b44ddd9b72d1fceb482f042431 /ms/blueprintsprocessor/modules
parent723cb0b0f4fca052561f21bb8312bf7c6e8cd524 (diff)
Cluster co-ordination with Hazelcast.
Remove Atomix implementation, due to Kubernetes clustering issues. Cluster environment property changes. Issue-ID: CCSDK-2011 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I23f40c92c0adc6b3ab8690871385f78525c76433
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml56
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt26
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt184
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt114
-rw-r--r--ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt152
-rwxr-xr-xms/blueprintsprocessor/modules/commons/pom.xml1
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/pom.xml4
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt (renamed from ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt)23
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt252
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt231
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml18
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml18
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml18
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml18
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml13
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml18
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml (renamed from ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml)4
19 files changed, 632 insertions, 553 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index 50cc44279..20aef3498 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -238,7 +238,6 @@ object BluePrintConstants {
const val PROPERTY_CLUSTER_ID = "CLUSTER_ID"
const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID"
const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS"
- const val PROPERTY_CLUSTER_MEMBERS = "CLUSTER_MEMBERS"
- const val PROPERTY_CLUSTER_STORAGE_PATH = "CLUSTER_STORAGE_PATH"
+ const val PROPERTY_CLUSTER_JOIN_AS_CLIENT = "CLUSTER_JOIN_AS_CLIENT"
const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE"
}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml
deleted file mode 100644
index 7fa7b452a..000000000
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright © 2018-2019 AT&T Intellectual Property.
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>commons</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>atomix-lib</artifactId>
- <packaging>jar</packaging>
-
- <name>Blueprints Processor Atomix Lib</name>
- <description>Blueprints Processor Atomix Lib</description>
-
- <dependencies>
- <dependency>
- <groupId>io.atomix</groupId>
- <artifactId>atomix</artifactId>
- </dependency>
- <dependency>
- <groupId>io.atomix</groupId>
- <artifactId>atomix-raft</artifactId>
- </dependency>
- <dependency>
- <groupId>io.atomix</groupId>
- <artifactId>atomix-primary-backup</artifactId>
- </dependency>
- <dependency>
- <groupId>io.atomix</groupId>
- <artifactId>atomix-gossip</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
- <artifactId>db-lib</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
deleted file mode 100644
index 17d243620..000000000
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.atomix
-
-import com.fasterxml.jackson.databind.JsonNode
-import io.atomix.core.map.DistributedMap
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
-
-fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> {
- return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode>
- else throw BluePrintProcessorException("map is not of type DistributedMap")
-}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
deleted file mode 100644
index 214a14310..000000000
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service
-
-import io.atomix.cluster.ClusterMembershipEvent
-import io.atomix.core.Atomix
-import io.atomix.core.lock.DistributedLock
-import kotlinx.coroutines.delay
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.springframework.stereotype.Service
-import java.time.Duration
-import java.util.concurrent.CompletableFuture
-
-@Service
-open class AtomixBluePrintClusterService : BluePrintClusterService {
-
- private val log = logger(AtomixBluePrintClusterService::class)
-
- lateinit var atomix: Atomix
-
- override suspend fun startCluster(clusterInfo: ClusterInfo) {
- log.info(
- "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
- "starting with members(${clusterInfo.clusterMembers})"
- )
-
- /** Create Atomix cluster either from config file or default multi-cast cluster*/
- atomix = if (!clusterInfo.configFile.isNullOrEmpty()) {
- AtomixLibUtils.configAtomix(clusterInfo.configFile!!)
- } else {
- AtomixLibUtils.defaultMulticastAtomix(clusterInfo)
- }
-
- /** Listen for the member chaneg events */
- atomix.membershipService.addListener { membershipEvent ->
- when (membershipEvent.type()) {
- ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}")
- ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}")
- ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}")
- ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}")
- else -> log.info("Member event unknown")
- }
- }
- /** Start and Join the Cluster */
- atomix.start().join()
- log.info(
- "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " +
- "created successfully...."
- )
-
- /** Receive ping from network */
- val pingHandler = { message: String ->
- log.info("####### ping message received : $message")
- CompletableFuture.completedFuture(message)
- }
- atomix.communicationService.subscribe("ping", pingHandler)
-
- /** Ping the network */
- atomix.communicationService.broadcast(
- "ping",
- "ping from node(${clusterInfo.nodeId})"
- )
- }
-
- override fun clusterJoined(): Boolean {
- return atomix.isRunning
- }
-
- override suspend fun masterMember(partitionGroup: String): ClusterMember {
- check(::atomix.isInitialized) { "failed to start and join cluster" }
- check(atomix.isRunning) { "cluster is not running" }
- val masterId = atomix.partitionService
- .getPartitionGroup(partitionGroup)
- .getPartition("1").primary()
- val masterMember = atomix.membershipService.getMember(masterId)
- return ClusterMember(
- id = masterMember.id().id(),
- memberAddress = masterMember.address().toString()
- )
- }
-
- override suspend fun allMembers(): Set<ClusterMember> {
- check(::atomix.isInitialized) { "failed to start and join cluster" }
- check(atomix.isRunning) { "cluster is not running" }
-
- return atomix.membershipService.members.map {
- ClusterMember(
- id = it.id().id(),
- memberAddress = it.address().toString()
- )
- }.toSet()
- }
-
- override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> {
- check(::atomix.isInitialized) { "failed to start and join cluster" }
- check(atomix.isRunning) { "cluster is not running" }
-
- return atomix.membershipService.members.filter {
- it.id().id().startsWith(memberPrefix, true)
- }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) }
- .toSet()
- }
-
- override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
- check(::atomix.isInitialized) { "failed to start and join cluster" }
- return AtomixLibUtils.distributedMapStore<T>(atomix, name)
- }
-
- /** The DistributedLock is a distributed implementation of Java’s Lock.
- * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
- * determine ordering of multiple concurrent lock holders.
- * DistributedLocks are designed to account for failures within the cluster.
- * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
- * the lock will be released and granted to the next waiting process. *
- */
- override suspend fun clusterLock(name: String): ClusterLock {
- check(::atomix.isInitialized) { "failed to start and join cluster" }
- return ClusterLockImpl(atomix, name)
- }
-
- override suspend fun shutDown(duration: Duration) {
- if (::atomix.isInitialized) {
- val shutDownMilli = duration.toMillis()
- log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms")
- delay(shutDownMilli)
- atomix.stop()
- }
- }
-}
-
-open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock {
- val log = logger(ClusterLockImpl::class)
-
- lateinit var distributedLock: DistributedLock
-
- override fun name(): String {
- return distributedLock.name()
- }
-
- override suspend fun lock() {
- distributedLock = AtomixLibUtils.distributedLock(atomix, name)
- distributedLock.lock()
- log.debug("Cluster lock($name) created..")
- }
-
- override suspend fun tryLock(timeout: Long): Boolean {
- distributedLock = AtomixLibUtils.distributedLock(atomix, name)
- return distributedLock.tryLock(Duration.ofMillis(timeout))
- }
-
- override suspend fun unLock() {
- distributedLock.unlock()
- log.debug("Cluster unlock(${name()}) successfully..")
- }
-
- override fun isLocked(): Boolean {
- return distributedLock.isLocked
- }
-
- override fun close() {
- if (::distributedLock.isInitialized) {
- distributedLock.close()
- }
- }
-}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
deleted file mode 100644
index 9be15f2e3..000000000
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils
-
-import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.node.ArrayNode
-import com.fasterxml.jackson.databind.node.MissingNode
-import com.fasterxml.jackson.databind.node.NullNode
-import com.fasterxml.jackson.databind.node.ObjectNode
-import io.atomix.core.Atomix
-import io.atomix.core.lock.AtomicLock
-import io.atomix.core.lock.DistributedLock
-import io.atomix.core.map.DistributedMap
-import io.atomix.protocols.backup.MultiPrimaryProtocol
-import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup
-import io.atomix.protocols.raft.partition.RaftPartitionGroup
-import io.atomix.utils.net.Address
-import org.jsoup.nodes.TextNode
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
-
-object AtomixLibUtils {
- private val log = logger(AtomixLibUtils::class)
-
- fun configAtomix(filePath: String): Atomix {
- val configFile = normalizedFile(filePath)
- return Atomix.builder(configFile.absolutePath).build()
- }
-
- fun defaultMulticastAtomix(
- clusterInfo: ClusterInfo,
- raftPartitions: Int = 1,
- primaryBackupPartitions: Int = 32
- ): Atomix {
-
- val nodeId = clusterInfo.nodeId
-
- val raftPartitionGroup = RaftPartitionGroup.builder("system")
- .withNumPartitions(raftPartitions)
- .withMembers(clusterInfo.clusterMembers)
- .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId"))
- .build()
-
- val primaryBackupGroup =
- PrimaryBackupPartitionGroup.builder("data")
- .withNumPartitions(primaryBackupPartitions)
- .build()
-
- return Atomix.builder()
- .withMemberId(nodeId)
- .withAddress(Address.from(clusterInfo.nodeAddress))
- .withManagementGroup(raftPartitionGroup)
- .withPartitionGroups(primaryBackupGroup)
- .withMulticastEnabled()
- .build()
- }
-
- fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> {
- check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" }
-
- val protocol = MultiPrimaryProtocol.builder()
- .withBackups(numBackups)
- .build()
-
- return atomix.mapBuilder<String, T>(storeName)
- .withProtocol(protocol)
- .withCacheEnabled()
- .withValueType(JsonNode::class.java)
- .withExtraTypes(
- JsonNode::class.java, TextNode::class.java, ObjectNode::class.java,
- ArrayNode::class.java, NullNode::class.java, MissingNode::class.java
- )
- .build()
- }
-
- fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock {
- check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" }
-
- val protocol = MultiPrimaryProtocol.builder()
- .withBackups(numBackups)
- .build()
- return atomix.lockBuilder(lockName)
- .withProtocol(protocol)
- .build()
- }
-
- /** get Atomic distributed lock, to get lock fence information */
- fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock {
- check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" }
-
- val protocol = MultiPrimaryProtocol.builder()
- .withBackups(numBackups)
- .build()
-
- return atomix.atomicLockBuilder(lockName)
- .withProtocol(protocol)
- .build()
- }
-}
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
deleted file mode 100644
index 67bf4cabb..000000000
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.atomix
-
-import com.fasterxml.jackson.databind.JsonNode
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.async
-import kotlinx.coroutines.awaitAll
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
-import org.junit.Before
-import org.junit.Test
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
-import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
-
-class AtomixBluePrintClusterServiceTest {
- private val log = logger(AtomixBluePrintClusterServiceTest::class)
-
- @Before
- fun init() {
- runBlocking {
- deleteNBDir("target/cluster")
- }
- }
-
- /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/
- @Test
- fun testClusterJoin() {
- runBlocking {
- val bluePrintClusterServiceOne =
- createCluster(arrayListOf(5679, 5680)).toMutableList()
- // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680))
- // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
- val bluePrintClusterService = bluePrintClusterServiceOne[0]
- log.info("Members : ${bluePrintClusterService.allMembers()}")
- log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}")
- log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}")
- testDistributedStore(bluePrintClusterServiceOne)
- testDistributedLock(bluePrintClusterServiceOne)
- }
- }
-
- private suspend fun createCluster(
- ports: List<Int>,
- otherClusterPorts: List<Int>? = null
- ): List<BluePrintClusterService> {
-
- return withContext(Dispatchers.Default) {
- val clusterMembers = ports.map { "node-$it" }.toMutableList()
- /** Add the other cluster as members */
- if (!otherClusterPorts.isNullOrEmpty()) {
- val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList()
- clusterMembers.addAll(otherClusterMembers)
- }
- val deferred = ports.map { port ->
- async(Dispatchers.IO) {
- val nodeId = "node-$port"
- log.info("********** Starting node($nodeId) on port($port)")
- val clusterInfo = ClusterInfo(
- id = "test-cluster", nodeId = nodeId,
- clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster"
- )
- val atomixClusterService = AtomixBluePrintClusterService()
- atomixClusterService.startCluster(clusterInfo)
- atomixClusterService
- }
- }
- deferred.awaitAll()
- }
- }
-
- private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
- /** Test Distributed store creation */
- repeat(2) { storeId ->
- val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
- "blueprint-runtime-$storeId"
- ).toDistributedMap()
- assertNotNull(store, "failed to get store")
- val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
- "blueprint-runtime-$storeId"
- ).toDistributedMap()
-
- store1.addListener {
- log.info("Received map event : $it")
- }
- repeat(5) {
- store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
- }
- delay(10)
- store.close()
- }
- }
-
- private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
- val lockName = "sample-lock"
- withContext(Dispatchers.IO) {
- val deferred = async {
- executeLock(bluePrintClusterServices[0], "first", lockName)
- }
- val deferred2 = async {
- executeLock(bluePrintClusterServices[0], "second", lockName)
- }
- val deferred3 = async {
- executeLock(bluePrintClusterServices[1], "third", lockName)
- }
- deferred.start()
- deferred2.start()
- deferred3.start()
- }
- }
-
- private suspend fun executeLock(
- bluePrintClusterService: BluePrintClusterService,
- lockId: String,
- lockName: String
- ) {
- log.info("initialising $lockId lock...")
- val distributedLock = bluePrintClusterService.clusterLock(lockName)
- assertNotNull(distributedLock, "failed to create distributed $lockId lock")
- distributedLock.lock()
- assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
- try {
- log.info("locked $lockId process for 5mSec")
- delay(5)
- } finally {
- distributedLock.unLock()
- log.info("$lockId lock released")
- }
- distributedLock.close()
- }
-}
diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml
index 18ef63469..bc1616d82 100755
--- a/ms/blueprintsprocessor/modules/commons/pom.xml
+++ b/ms/blueprintsprocessor/modules/commons/pom.xml
@@ -34,7 +34,6 @@
<modules>
<module>processor-core</module>
- <module>atomix-lib</module>
<module>db-lib</module>
<module>rest-lib</module>
<module>dmaap-lib</module>
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml
index 2f5ae6624..d08c16781 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml
@@ -32,6 +32,10 @@
<description>Blueprints Processor Core</description>
<dependencies>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast-all</artifactId>
+ </dependency>
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>blueprint-proto</artifactId>
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
index 8ef290303..85d9d5c27 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt
@@ -14,22 +14,19 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.atomix
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
+import com.hazelcast.cluster.Member
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-import org.springframework.context.annotation.Configuration
-
-@Configuration
-open class BluePrintAtomixLibConfiguration
/**
- * Exposed Dependency Service by this Atomix Lib Module
+ * Exposed Dependency Service by this Hazlecast Lib Module
*/
fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
- instance(AtomixBluePrintClusterService::class)
+ instance(HazlecastClusterService::class)
/** Optional Cluster Service, returns only if Cluster is enabled */
fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
@@ -37,3 +34,13 @@ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService
BluePrintDependencyService.clusterService()
} else null
}
+
+/** Extension to convert Hazelcast Member to Blueprints Cluster Member */
+fun Member.toClusterMember(): ClusterMember {
+ val memberName: String = this.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: this.uuid.toString()
+ return ClusterMember(
+ id = this.uuid.toString(),
+ name = memberName,
+ memberAddress = this.address.toString()
+ )
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
new file mode 100644
index 000000000..83a04d653
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt
@@ -0,0 +1,252 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import com.hazelcast.client.HazelcastClient
+import com.hazelcast.client.config.ClientConfig
+import com.hazelcast.client.config.YamlClientConfigBuilder
+import com.hazelcast.cluster.Member
+import com.hazelcast.cluster.MembershipEvent
+import com.hazelcast.cluster.MembershipListener
+import com.hazelcast.config.Config
+import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.config.MemberAttributeConfig
+import com.hazelcast.core.Hazelcast
+import com.hazelcast.core.HazelcastInstance
+import com.hazelcast.cp.lock.FencedLock
+import com.hazelcast.scheduledexecutor.IScheduledExecutorService
+import kotlinx.coroutines.delay
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+import org.springframework.stereotype.Service
+import java.time.Duration
+import java.util.concurrent.TimeUnit
+
+@Service
+open class HazlecastClusterService : BluePrintClusterService {
+
+ private val log = logger(HazlecastClusterService::class)
+ lateinit var hazelcast: HazelcastInstance
+ var joinedClient = false
+ var joinedLite = false
+
+ override suspend fun <T> startCluster(configuration: T) {
+ /** Get the Hazelcast Cliet or Server instance */
+ hazelcast =
+ when (configuration) {
+ is Config -> {
+ joinedLite = configuration.isLiteMember
+ Hazelcast.newHazelcastInstance(configuration)
+ }
+ is ClientConfig -> {
+ joinedClient = true
+ HazelcastClient.newHazelcastClient(configuration)
+ }
+ is ClusterInfo -> {
+
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id)
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId)
+
+ val memberAttributeConfig = MemberAttributeConfig()
+ memberAttributeConfig.setAttribute(
+ BluePrintConstants.PROPERTY_CLUSTER_NODE_ID,
+ configuration.nodeId
+ )
+
+ val configFile = configuration.configFile
+ /** Check file exists */
+ val clusterConfigFile = normalizedFile(configuration.configFile)
+ check(clusterConfigFile.absolutePath.endsWith("yaml", true)) {
+ "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml"
+ }
+ check(clusterConfigFile.exists()) {
+ "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})"
+ }
+ log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****")
+
+ /** Hazelcast Client from config file */
+ if (configuration.joinAsClient) {
+ /** Set the configuration file to system properties, so that Hazelcast will read automatically */
+ System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath)
+ joinedClient = true
+ val hazelcastClientConfiguration = YamlClientConfigBuilder().build()
+ hazelcastClientConfiguration.properties = configuration.properties
+ HazelcastClient.newHazelcastClient(hazelcastClientConfiguration)
+ } else {
+ /** Hazelcast Server from config file */
+ val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile))
+ hazelcastServerConfiguration.properties = configuration.properties
+ hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig
+ joinedLite = hazelcastServerConfiguration.isLiteMember
+ Hazelcast.newHazelcastInstance(hazelcastServerConfiguration)
+ }
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't understand the cluster configuration")
+ }
+ }
+
+ /** Add the Membership Listeners */
+ hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this))
+ log.info(
+ "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...."
+ )
+ }
+
+ override fun isClient(): Boolean {
+ return joinedClient
+ }
+
+ override fun isLiteMember(): Boolean {
+ return joinedLite
+ }
+
+ override fun clusterJoined(): Boolean {
+ return hazelcast.lifecycleService.isRunning
+ }
+
+ override suspend fun masterMember(partitionGroup: String): ClusterMember {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.cluster.members.first().toClusterMember()
+ }
+
+ override suspend fun allMembers(): Set<ClusterMember> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.cluster.members.map { it.toClusterMember() }.toSet()
+ }
+
+ override suspend fun applicationMembers(appName: String): Set<ClusterMember> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet()
+ }
+
+ override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.getMap<String, T>(name)
+ }
+
+ /**
+ * The DistributedLock is a distributed implementation of Java’s Lock.
+ * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to
+ * determine ordering of multiple concurrent lock holders.
+ * DistributedLocks are designed to account for failures within the cluster.
+ * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled,
+ * the lock will be released and granted to the next waiting process.
+ */
+ override suspend fun clusterLock(name: String): ClusterLock {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return ClusterLockImpl(hazelcast, name)
+ }
+
+ /** Return interface may change and it will be included in BluePrintClusterService */
+ @UseExperimental
+ suspend fun clusterScheduler(name: String): IScheduledExecutorService {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ return hazelcast.getScheduledExecutorService(name)
+ }
+
+ override suspend fun shutDown(duration: Duration) {
+ if (::hazelcast.isInitialized && clusterJoined()) {
+ delay(duration.toMillis())
+ hazelcast.lifecycleService.terminate()
+ }
+ }
+
+ /** Utils */
+ suspend fun myHazelcastApplicationMembers(): Map<String, Member> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ check(!isClient()) { "not supported for cluster client members." }
+ return hazelcastApplicationMembers(ClusterUtils.applicationName())
+ }
+
+ suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> {
+ check(::hazelcast.isInitialized) { "failed to start and join cluster" }
+ val applicationMembers: MutableMap<String, Member> = hashMapOf()
+ hazelcast.cluster.members.map { member ->
+ val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID)
+ if (memberName.startsWith(appName, true)) {
+ applicationMembers[memberName] = member
+ }
+ }
+ return applicationMembers
+ }
+}
+
+open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) :
+ MembershipListener {
+ private val log = logger(BlueprintsClusterMembershipListener::class)
+
+ override fun memberRemoved(membershipEvent: MembershipEvent) {
+ log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent")
+ }
+
+ override fun memberAdded(membershipEvent: MembershipEvent) {
+ log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent")
+ }
+}
+
+open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock {
+ private val log = logger(ClusterLockImpl::class)
+
+ lateinit var distributedLock: FencedLock
+
+ override fun name(): String {
+ return distributedLock.name
+ }
+
+ override suspend fun lock() {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ distributedLock.lock()
+ log.trace("Cluster lock($name) created..")
+ }
+
+ override suspend fun tryLock(timeout: Long): Boolean {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS)
+ }
+
+ override suspend fun unLock() {
+ distributedLock.unlock()
+ log.trace("Cluster unlock(${name()}) successfully..")
+ }
+
+ override fun isLocked(): Boolean {
+ return distributedLock.isLocked
+ }
+
+ override suspend fun fenceLock(): String {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ val fence = distributedLock.lockAndGetFence()
+ log.trace("Cluster lock($name) fence($fence) created..")
+ return fence.toString()
+ }
+
+ override suspend fun tryFenceLock(timeout: Long): String {
+ distributedLock = hazelcast.cpSubsystem.getLock(name)
+ return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString()
+ }
+
+ override fun close() {
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
index f994628a2..53f18d38a 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt
@@ -17,23 +17,33 @@
package org.onap.ccsdk.cds.blueprintsprocessor.core.service
import java.time.Duration
+import java.util.Properties
interface BluePrintClusterService {
/** Start the cluster with [clusterInfo], By default clustering service is disabled.
* Application module has to start cluster */
- suspend fun startCluster(clusterInfo: ClusterInfo)
+ suspend fun <T> startCluster(configuration: T)
fun clusterJoined(): Boolean
+ fun isClient(): Boolean
+
+ fun isLiteMember(): Boolean
+
/** Returns [partitionGroup] master member */
suspend fun masterMember(partitionGroup: String): ClusterMember
/** Returns all the data cluster members */
suspend fun allMembers(): Set<ClusterMember>
- /** Returns data cluster members starting with prefix */
- suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember>
+ /**
+ * Returns application cluster members for [appName] joined as server or lite member,
+ * Node joined as client won't be visible. Here the assumption is node-id is combination of
+ * application id and replica number, for an example Application cds-cluster then the node ids will be
+ * cds-cluster-1, cds-cluster-2, cds-cluster-3
+ */
+ suspend fun applicationMembers(appName: String): Set<ClusterMember>
/** Create and get or get the distributed data map store with [name] */
suspend fun <T> clusterMapStore(name: String): MutableMap<String, T>
@@ -47,19 +57,25 @@ interface BluePrintClusterService {
data class ClusterInfo(
val id: String,
- var configFile: String? = null,
val nodeId: String,
- val nodeAddress: String,
- var clusterMembers: List<String>,
- var storagePath: String
+ var joinAsClient: Boolean = false,
+ var properties: Properties?,
+ var configFile: String
)
-data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null)
+data class ClusterMember(
+ val id: String,
+ val name: String,
+ val memberAddress: String?,
+ val state: String? = null
+)
interface ClusterLock {
fun name(): String
suspend fun lock()
+ suspend fun fenceLock(): String
suspend fun tryLock(timeout: Long): Boolean
+ suspend fun tryFenceLock(timeout: Long): String
suspend fun unLock()
fun isLocked(): Boolean
fun close()
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt
new file mode 100644
index 000000000..b298eacae
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt
@@ -0,0 +1,231 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.hazelcast.client.config.YamlClientConfigBuilder
+import com.hazelcast.cluster.Member
+import com.hazelcast.config.FileSystemYamlConfig
+import com.hazelcast.map.IMap
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
+import kotlinx.coroutines.awaitAll
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.withContext
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+import java.io.Serializable
+import java.time.Duration
+import java.util.Properties
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
+
+class HazlecastClusterServiceTest {
+ private val log = logger(HazlecastClusterServiceTest::class)
+ private val clusterSize = 3
+
+ @Test
+ fun testClientFileSystemYamlConfig() {
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
+ System.setProperty(
+ "hazelcast.client.config",
+ normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath
+ )
+ val config = YamlClientConfigBuilder().build()
+ assertNotNull(config)
+ assertEquals("test-cluster", config.clusterName)
+ assertEquals("node-1234", config.instanceName)
+ }
+
+ @Test
+ fun testServerFileSystemYamlConfig() {
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster")
+ System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234")
+ val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml")
+ val config = FileSystemYamlConfig(configFile)
+ assertNotNull(config)
+ assertEquals("test-cluster", config.clusterName)
+ assertEquals("node-1234", config.instanceName)
+ }
+
+ @Test
+ fun testClusterJoin() {
+ runBlocking {
+ val bluePrintClusterServiceOne =
+ createCluster(arrayListOf(5679, 5680, 5681)).toMutableList()
+ // delay(1000)
+ // Join as Hazlecast Management Node
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true)
+ // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false)
+ // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo)
+ printReachableMembers(bluePrintClusterServiceOne)
+ testDistributedStore(bluePrintClusterServiceOne)
+ testDistributedLock(bluePrintClusterServiceOne)
+
+ // executeScheduler(bluePrintClusterServiceOne[0])
+ // delay(1000)
+ // Shutdown
+ shutdown(bluePrintClusterServiceOne)
+ }
+ }
+
+ private suspend fun createCluster(
+ ports: List<Int>,
+ joinAsClient: Boolean? = false
+ ): List<BluePrintClusterService> {
+
+ return withContext(Dispatchers.Default) {
+ val deferred = ports.map { port ->
+ async(Dispatchers.IO) {
+ val nodeId = "node-$port"
+ log.info("********** Starting node($nodeId) on port($port)")
+ val properties = Properties()
+ properties["hazelcast.logging.type"] = "slf4j"
+ val clusterInfo =
+ if (joinAsClient!!) {
+ ClusterInfo(
+ id = "test-cluster", nodeId = nodeId, joinAsClient = true,
+ configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml",
+ properties = properties
+ )
+ } else {
+ ClusterInfo(
+ id = "test-cluster", nodeId = nodeId, joinAsClient = false,
+ configFile = "./src/test/resources/hazelcast/hazelcast-$port.yaml",
+ properties = properties
+ )
+ }
+ val hazlecastClusterService = HazlecastClusterService()
+ hazlecastClusterService.startCluster(clusterInfo)
+ hazlecastClusterService
+ }
+ }
+ deferred.awaitAll()
+ }
+ }
+
+ private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) {
+ bluePrintClusterServices.forEach { bluePrintClusterService ->
+ bluePrintClusterService.shutDown(Duration.ofMillis(10))
+ }
+ }
+
+ private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) {
+ /** Test Distributed store creation */
+ repeat(2) { storeId ->
+ val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ) as IMap
+ assertNotNull(store, "failed to get store")
+ repeat(5) {
+ store["key-$storeId-$it"] = "value-$it".asJsonPrimitive()
+ }
+
+ val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>(
+ "blueprint-runtime-$storeId"
+ ) as IMap
+
+ store1.values.map {
+ log.trace("Received map event : $it")
+ }
+ delay(5)
+ store.clear()
+ }
+ }
+
+ private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) {
+ val lockName = "sample-lock"
+ withContext(Dispatchers.IO) {
+ val deferred = async {
+ executeLock(bluePrintClusterServices[0], "first", lockName)
+ }
+ val deferred2 = async {
+ executeLock(bluePrintClusterServices[0], "second", lockName)
+ }
+ val deferred3 = async {
+ executeLock(bluePrintClusterServices[2], "third", lockName)
+ }
+ deferred.start()
+ deferred2.start()
+ deferred3.start()
+ }
+ }
+
+ private suspend fun executeLock(
+ bluePrintClusterService: BluePrintClusterService,
+ lockId: String,
+ lockName: String
+ ) {
+ log.info("initialising $lockId lock...")
+ val distributedLock = bluePrintClusterService.clusterLock(lockName)
+ assertNotNull(distributedLock, "failed to create distributed $lockId lock")
+ distributedLock.lock()
+ assertTrue(distributedLock.isLocked(), "failed to lock $lockId")
+ try {
+ log.info("locked $lockId process for 5mSec")
+ delay(5)
+ } finally {
+ distributedLock.unLock()
+ log.info("$lockId lock released")
+ }
+ distributedLock.close()
+ }
+
+ private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) {
+ log.info("initialising ...")
+ val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
+
+ val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap
+ assertEquals(3, memberNameMap.size, "failed to match member size")
+ memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") }
+ val scheduler = hazlecastClusterService.clusterScheduler("cleanup")
+ // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS)
+ // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS)
+ // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS)
+ // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS)
+ }
+
+ private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) {
+ bluePrintClusterServices.forEach { bluePrintClusterService ->
+ val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService
+ val hazelcast = hazlecastClusterService.hazelcast
+ val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null
+ val master = hazlecastClusterService.masterMember("system").memberAddress
+ val members = hazlecastClusterService.allMembers().map { it.memberAddress }
+ log.info("Cluster Members for($self): master($master) Members($members)")
+ }
+
+ val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56")
+ assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size")
+ log.info("Cluster applicationMembers ($applicationMembers)")
+ }
+}
+
+open class SampleSchedulerTask : Runnable, Serializable {
+ private val log = logger(SampleSchedulerTask::class)
+ override fun run() {
+ log.info("I am scheduler action")
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml
new file mode 100644
index 000000000..cbf488c95
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml
@@ -0,0 +1,18 @@
+hazelcast:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: node-5679
+ lite-member:
+ enabled: false
+ cp-subsystem:
+ cp-member-count: 3
+ group-size: 3
+# network:
+# join:
+# multicast:
+# enabled: false
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml
new file mode 100644
index 000000000..356be1d05
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml
@@ -0,0 +1,18 @@
+hazelcast:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: node-5680
+ lite-member:
+ enabled: false
+ cp-subsystem:
+ cp-member-count: 3
+ group-size: 3
+# network:
+# join:
+# multicast:
+# enabled: false
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml
new file mode 100644
index 000000000..d256f49e3
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml
@@ -0,0 +1,18 @@
+hazelcast:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: node-5681
+ lite-member:
+ enabled: false
+ cp-subsystem:
+ cp-member-count: 3
+ group-size: 3
+# network:
+# join:
+# multicast:
+# enabled: false
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml
new file mode 100644
index 000000000..9c7d566db
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml
@@ -0,0 +1,18 @@
+hazelcast:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: node-5682
+ lite-member:
+ enabled: true
+ cp-subsystem:
+ cp-member-count: 3
+ group-size: 3
+# network:
+# join:
+# multicast:
+# enabled: false
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml
new file mode 100644
index 000000000..e60b5dfc4
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml
@@ -0,0 +1,13 @@
+hazelcast-client:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: ${CLUSTER_NODE_ID}
+
+ network:
+ cluster-members:
+ - 127.0.0.1:5701
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml
new file mode 100644
index 000000000..dcecf454f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml
@@ -0,0 +1,18 @@
+hazelcast:
+ cluster-name: ${CLUSTER_ID}
+ instance-name: ${CLUSTER_NODE_ID}
+ lite-member:
+ enabled: true
+ cp-subsystem:
+ cp-member-count: 3
+ group-size: 3
+# network:
+# join:
+# multicast:
+# enabled: false
+# kubernetes:
+# enabled: true
+# namespace: MY-KUBERNETES-NAMESPACE
+# service-name: MY-SERVICE-NAME
+# service-label-name: MY-SERVICE-LABEL-NAME
+# service-label-value: MY-SERVICE-LABEL-VALUE \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml
index 016d48636..5275f4029 100644
--- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml
@@ -1,5 +1,5 @@
<!--
- ~ Copyright © 2017-2018 AT&T Intellectual Property.
+ ~ Copyright © 2018-2019 AT&T Intellectual Property.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@
<logger name="org.springframework.test" level="warn"/>
<logger name="org.springframework" level="warn"/>
<logger name="org.hibernate" level="info"/>
- <logger name="io.atomix" level="warn"/>
+ <logger name="com.hazelcast" level="warn"/>
<logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
<root level="warn">