diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules')
19 files changed, 632 insertions, 553 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 50cc44279..20aef3498 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -238,7 +238,6 @@ object BluePrintConstants { const val PROPERTY_CLUSTER_ID = "CLUSTER_ID" const val PROPERTY_CLUSTER_NODE_ID = "CLUSTER_NODE_ID" const val PROPERTY_CLUSTER_NODE_ADDRESS = "CLUSTER_NODE_ADDRESS" - const val PROPERTY_CLUSTER_MEMBERS = "CLUSTER_MEMBERS" - const val PROPERTY_CLUSTER_STORAGE_PATH = "CLUSTER_STORAGE_PATH" + const val PROPERTY_CLUSTER_JOIN_AS_CLIENT = "CLUSTER_JOIN_AS_CLIENT" const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE" } diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml deleted file mode 100644 index 7fa7b452a..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Copyright © 2018-2019 AT&T Intellectual Property. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> - <artifactId>commons</artifactId> - <version>0.7.0-SNAPSHOT</version> - </parent> - - <artifactId>atomix-lib</artifactId> - <packaging>jar</packaging> - - <name>Blueprints Processor Atomix Lib</name> - <description>Blueprints Processor Atomix Lib</description> - - <dependencies> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-raft</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-primary-backup</artifactId> - </dependency> - <dependency> - <groupId>io.atomix</groupId> - <artifactId>atomix-gossip</artifactId> - </dependency> - - <dependency> - <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> - <artifactId>db-lib</artifactId> - </dependency> - </dependencies> -</project> diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt deleted file mode 100644 index 17d243620..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibExtensions.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix - -import com.fasterxml.jackson.databind.JsonNode -import io.atomix.core.map.DistributedMap -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException - -fun <T : Map<String, JsonNode>> T.toDistributedMap(): DistributedMap<String, JsonNode> { - return if (this != null && this is DistributedMap<*, *>) this as DistributedMap<String, JsonNode> - else throw BluePrintProcessorException("map is not of type DistributedMap") -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt deleted file mode 100644 index 214a14310..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/service/AtomixBluePrintClusterService.kt +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix.service - -import io.atomix.cluster.ClusterMembershipEvent -import io.atomix.core.Atomix -import io.atomix.core.lock.DistributedLock -import kotlinx.coroutines.delay -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils.AtomixLibUtils -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.springframework.stereotype.Service -import java.time.Duration -import java.util.concurrent.CompletableFuture - -@Service -open class AtomixBluePrintClusterService : BluePrintClusterService { - - private val log = logger(AtomixBluePrintClusterService::class) - - lateinit var atomix: Atomix - - override suspend fun startCluster(clusterInfo: ClusterInfo) { - log.info( - "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + - "starting with members(${clusterInfo.clusterMembers})" - ) - - /** Create Atomix cluster either from config file or default multi-cast cluster*/ - atomix = if (!clusterInfo.configFile.isNullOrEmpty()) { - AtomixLibUtils.configAtomix(clusterInfo.configFile!!) - } else { - AtomixLibUtils.defaultMulticastAtomix(clusterInfo) - } - - /** Listen for the member chaneg events */ - atomix.membershipService.addListener { membershipEvent -> - when (membershipEvent.type()) { - ClusterMembershipEvent.Type.MEMBER_ADDED -> log.info("Member Added : ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.MEMBER_REMOVED -> log.info("Member Removed: ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.REACHABILITY_CHANGED -> log.info("Reachability Changed : ${membershipEvent.subject()}") - ClusterMembershipEvent.Type.METADATA_CHANGED -> log.info("Changed : ${membershipEvent.subject()}") - else -> log.info("Member event unknown") - } - } - /** Start and Join the Cluster */ - atomix.start().join() - log.info( - "Cluster(${clusterInfo.id}) node(${clusterInfo.nodeId}), node address(${clusterInfo.nodeAddress}) " + - "created successfully...." - ) - - /** Receive ping from network */ - val pingHandler = { message: String -> - log.info("####### ping message received : $message") - CompletableFuture.completedFuture(message) - } - atomix.communicationService.subscribe("ping", pingHandler) - - /** Ping the network */ - atomix.communicationService.broadcast( - "ping", - "ping from node(${clusterInfo.nodeId})" - ) - } - - override fun clusterJoined(): Boolean { - return atomix.isRunning - } - - override suspend fun masterMember(partitionGroup: String): ClusterMember { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - val masterId = atomix.partitionService - .getPartitionGroup(partitionGroup) - .getPartition("1").primary() - val masterMember = atomix.membershipService.getMember(masterId) - return ClusterMember( - id = masterMember.id().id(), - memberAddress = masterMember.address().toString() - ) - } - - override suspend fun allMembers(): Set<ClusterMember> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - - return atomix.membershipService.members.map { - ClusterMember( - id = it.id().id(), - memberAddress = it.address().toString() - ) - }.toSet() - } - - override suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - check(atomix.isRunning) { "cluster is not running" } - - return atomix.membershipService.members.filter { - it.id().id().startsWith(memberPrefix, true) - }.map { ClusterMember(id = it.id().id(), memberAddress = it.host()) } - .toSet() - } - - override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { - check(::atomix.isInitialized) { "failed to start and join cluster" } - return AtomixLibUtils.distributedMapStore<T>(atomix, name) - } - - /** The DistributedLock is a distributed implementation of Java’s Lock. - * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to - * determine ordering of multiple concurrent lock holders. - * DistributedLocks are designed to account for failures within the cluster. - * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, - * the lock will be released and granted to the next waiting process. * - */ - override suspend fun clusterLock(name: String): ClusterLock { - check(::atomix.isInitialized) { "failed to start and join cluster" } - return ClusterLockImpl(atomix, name) - } - - override suspend fun shutDown(duration: Duration) { - if (::atomix.isInitialized) { - val shutDownMilli = duration.toMillis() - log.info("Received cluster shutdown request, shutdown in ($shutDownMilli)ms") - delay(shutDownMilli) - atomix.stop() - } - } -} - -open class ClusterLockImpl(private val atomix: Atomix, private val name: String) : ClusterLock { - val log = logger(ClusterLockImpl::class) - - lateinit var distributedLock: DistributedLock - - override fun name(): String { - return distributedLock.name() - } - - override suspend fun lock() { - distributedLock = AtomixLibUtils.distributedLock(atomix, name) - distributedLock.lock() - log.debug("Cluster lock($name) created..") - } - - override suspend fun tryLock(timeout: Long): Boolean { - distributedLock = AtomixLibUtils.distributedLock(atomix, name) - return distributedLock.tryLock(Duration.ofMillis(timeout)) - } - - override suspend fun unLock() { - distributedLock.unlock() - log.debug("Cluster unlock(${name()}) successfully..") - } - - override fun isLocked(): Boolean { - return distributedLock.isLocked - } - - override fun close() { - if (::distributedLock.isInitialized) { - distributedLock.close() - } - } -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt deleted file mode 100644 index 9be15f2e3..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/utils/AtomixLibUtils.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix.utils - -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ArrayNode -import com.fasterxml.jackson.databind.node.MissingNode -import com.fasterxml.jackson.databind.node.NullNode -import com.fasterxml.jackson.databind.node.ObjectNode -import io.atomix.core.Atomix -import io.atomix.core.lock.AtomicLock -import io.atomix.core.lock.DistributedLock -import io.atomix.core.map.DistributedMap -import io.atomix.protocols.backup.MultiPrimaryProtocol -import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup -import io.atomix.protocols.raft.partition.RaftPartitionGroup -import io.atomix.utils.net.Address -import org.jsoup.nodes.TextNode -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile - -object AtomixLibUtils { - private val log = logger(AtomixLibUtils::class) - - fun configAtomix(filePath: String): Atomix { - val configFile = normalizedFile(filePath) - return Atomix.builder(configFile.absolutePath).build() - } - - fun defaultMulticastAtomix( - clusterInfo: ClusterInfo, - raftPartitions: Int = 1, - primaryBackupPartitions: Int = 32 - ): Atomix { - - val nodeId = clusterInfo.nodeId - - val raftPartitionGroup = RaftPartitionGroup.builder("system") - .withNumPartitions(raftPartitions) - .withMembers(clusterInfo.clusterMembers) - .withDataDirectory(normalizedFile("${clusterInfo.storagePath}/data-$nodeId")) - .build() - - val primaryBackupGroup = - PrimaryBackupPartitionGroup.builder("data") - .withNumPartitions(primaryBackupPartitions) - .build() - - return Atomix.builder() - .withMemberId(nodeId) - .withAddress(Address.from(clusterInfo.nodeAddress)) - .withManagementGroup(raftPartitionGroup) - .withPartitionGroups(primaryBackupGroup) - .withMulticastEnabled() - .build() - } - - fun <T> distributedMapStore(atomix: Atomix, storeName: String, numBackups: Int = 2): DistributedMap<String, T> { - check(atomix.isRunning) { "Cluster is not running, couldn't create distributed store($storeName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - - return atomix.mapBuilder<String, T>(storeName) - .withProtocol(protocol) - .withCacheEnabled() - .withValueType(JsonNode::class.java) - .withExtraTypes( - JsonNode::class.java, TextNode::class.java, ObjectNode::class.java, - ArrayNode::class.java, NullNode::class.java, MissingNode::class.java - ) - .build() - } - - fun distributedLock(atomix: Atomix, lockName: String, numBackups: Int = 2): DistributedLock { - check(atomix.isRunning) { "Cluster is not running, couldn't create distributed lock($lockName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - return atomix.lockBuilder(lockName) - .withProtocol(protocol) - .build() - } - - /** get Atomic distributed lock, to get lock fence information */ - fun atomicLock(atomix: Atomix, lockName: String, numBackups: Int = 2): AtomicLock { - check(atomix.isRunning) { "Cluster is not running, couldn't create atomic lock($lockName)" } - - val protocol = MultiPrimaryProtocol.builder() - .withBackups(numBackups) - .build() - - return atomix.atomicLockBuilder(lockName) - .withProtocol(protocol) - .build() - } -} diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt deleted file mode 100644 index 67bf4cabb..000000000 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/AtomixBluePrintClusterServiceTest.kt +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.atomix - -import com.fasterxml.jackson.databind.JsonNode -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.delay -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import org.junit.Before -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive -import org.onap.ccsdk.cds.controllerblueprints.core.deleteNBDir -import org.onap.ccsdk.cds.controllerblueprints.core.logger -import kotlin.test.assertNotNull -import kotlin.test.assertTrue - -class AtomixBluePrintClusterServiceTest { - private val log = logger(AtomixBluePrintClusterServiceTest::class) - - @Before - fun init() { - runBlocking { - deleteNBDir("target/cluster") - } - } - - /** Testing two cluster with distributed map store creation, This is time consuming test case, taks around 10s **/ - @Test - fun testClusterJoin() { - runBlocking { - val bluePrintClusterServiceOne = - createCluster(arrayListOf(5679, 5680)).toMutableList() - // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5681, 5682), arrayListOf(5679, 5680)) - // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) - val bluePrintClusterService = bluePrintClusterServiceOne[0] - log.info("Members : ${bluePrintClusterService.allMembers()}") - log.info("Master(System) Members : ${bluePrintClusterService.masterMember("system")}") - log.info("Master(Data) Members : ${bluePrintClusterService.masterMember("data")}") - testDistributedStore(bluePrintClusterServiceOne) - testDistributedLock(bluePrintClusterServiceOne) - } - } - - private suspend fun createCluster( - ports: List<Int>, - otherClusterPorts: List<Int>? = null - ): List<BluePrintClusterService> { - - return withContext(Dispatchers.Default) { - val clusterMembers = ports.map { "node-$it" }.toMutableList() - /** Add the other cluster as members */ - if (!otherClusterPorts.isNullOrEmpty()) { - val otherClusterMembers = otherClusterPorts.map { "node-$it" }.toMutableList() - clusterMembers.addAll(otherClusterMembers) - } - val deferred = ports.map { port -> - async(Dispatchers.IO) { - val nodeId = "node-$port" - log.info("********** Starting node($nodeId) on port($port)") - val clusterInfo = ClusterInfo( - id = "test-cluster", nodeId = nodeId, - clusterMembers = clusterMembers, nodeAddress = "localhost:$port", storagePath = "target/cluster" - ) - val atomixClusterService = AtomixBluePrintClusterService() - atomixClusterService.startCluster(clusterInfo) - atomixClusterService - } - } - deferred.awaitAll() - } - } - - private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { - /** Test Distributed store creation */ - repeat(2) { storeId -> - val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>( - "blueprint-runtime-$storeId" - ).toDistributedMap() - assertNotNull(store, "failed to get store") - val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>( - "blueprint-runtime-$storeId" - ).toDistributedMap() - - store1.addListener { - log.info("Received map event : $it") - } - repeat(5) { - store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() - } - delay(10) - store.close() - } - } - - private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) { - val lockName = "sample-lock" - withContext(Dispatchers.IO) { - val deferred = async { - executeLock(bluePrintClusterServices[0], "first", lockName) - } - val deferred2 = async { - executeLock(bluePrintClusterServices[0], "second", lockName) - } - val deferred3 = async { - executeLock(bluePrintClusterServices[1], "third", lockName) - } - deferred.start() - deferred2.start() - deferred3.start() - } - } - - private suspend fun executeLock( - bluePrintClusterService: BluePrintClusterService, - lockId: String, - lockName: String - ) { - log.info("initialising $lockId lock...") - val distributedLock = bluePrintClusterService.clusterLock(lockName) - assertNotNull(distributedLock, "failed to create distributed $lockId lock") - distributedLock.lock() - assertTrue(distributedLock.isLocked(), "failed to lock $lockId") - try { - log.info("locked $lockId process for 5mSec") - delay(5) - } finally { - distributedLock.unLock() - log.info("$lockId lock released") - } - distributedLock.close() - } -} diff --git a/ms/blueprintsprocessor/modules/commons/pom.xml b/ms/blueprintsprocessor/modules/commons/pom.xml index 18ef63469..bc1616d82 100755 --- a/ms/blueprintsprocessor/modules/commons/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/pom.xml @@ -34,7 +34,6 @@ <modules> <module>processor-core</module> - <module>atomix-lib</module> <module>db-lib</module> <module>rest-lib</module> <module>dmaap-lib</module> diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml index 2f5ae6624..d08c16781 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/pom.xml @@ -32,6 +32,10 @@ <description>Blueprints Processor Core</description> <dependencies> + <dependency> + <groupId>com.hazelcast</groupId> + <artifactId>hazelcast-all</artifactId> + </dependency> <dependency> <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>blueprint-proto</artifactId> diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt index 8ef290303..85d9d5c27 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/BluePrintClusterExtensions.kt @@ -14,22 +14,19 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.atomix +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster -import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService +import com.hazelcast.cluster.Member import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService -import org.springframework.context.annotation.Configuration - -@Configuration -open class BluePrintAtomixLibConfiguration /** - * Exposed Dependency Service by this Atomix Lib Module + * Exposed Dependency Service by this Hazlecast Lib Module */ fun BluePrintDependencyService.clusterService(): BluePrintClusterService = - instance(AtomixBluePrintClusterService::class) + instance(HazlecastClusterService::class) /** Optional Cluster Service, returns only if Cluster is enabled */ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? { @@ -37,3 +34,13 @@ fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService BluePrintDependencyService.clusterService() } else null } + +/** Extension to convert Hazelcast Member to Blueprints Cluster Member */ +fun Member.toClusterMember(): ClusterMember { + val memberName: String = this.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) ?: this.uuid.toString() + return ClusterMember( + id = this.uuid.toString(), + name = memberName, + memberAddress = this.address.toString() + ) +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt new file mode 100644 index 000000000..83a04d653 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterService.kt @@ -0,0 +1,252 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.hazelcast.client.HazelcastClient +import com.hazelcast.client.config.ClientConfig +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.cluster.MembershipEvent +import com.hazelcast.cluster.MembershipListener +import com.hazelcast.config.Config +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.config.MemberAttributeConfig +import com.hazelcast.core.Hazelcast +import com.hazelcast.core.HazelcastInstance +import com.hazelcast.cp.lock.FencedLock +import com.hazelcast.scheduledexecutor.IScheduledExecutorService +import kotlinx.coroutines.delay +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterMember +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils +import org.springframework.stereotype.Service +import java.time.Duration +import java.util.concurrent.TimeUnit + +@Service +open class HazlecastClusterService : BluePrintClusterService { + + private val log = logger(HazlecastClusterService::class) + lateinit var hazelcast: HazelcastInstance + var joinedClient = false + var joinedLite = false + + override suspend fun <T> startCluster(configuration: T) { + /** Get the Hazelcast Cliet or Server instance */ + hazelcast = + when (configuration) { + is Config -> { + joinedLite = configuration.isLiteMember + Hazelcast.newHazelcastInstance(configuration) + } + is ClientConfig -> { + joinedClient = true + HazelcastClient.newHazelcastClient(configuration) + } + is ClusterInfo -> { + + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, configuration.id) + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, configuration.nodeId) + + val memberAttributeConfig = MemberAttributeConfig() + memberAttributeConfig.setAttribute( + BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, + configuration.nodeId + ) + + val configFile = configuration.configFile + /** Check file exists */ + val clusterConfigFile = normalizedFile(configuration.configFile) + check(clusterConfigFile.absolutePath.endsWith("yaml", true)) { + "couldn't understand cluster config file(${configuration.configFile}) format, it should be yaml" + } + check(clusterConfigFile.exists()) { + "couldn't file cluster configuration file(${clusterConfigFile.absolutePath})" + } + log.info("****** Cluster configuration file(${clusterConfigFile.absolutePath}) ****") + + /** Hazelcast Client from config file */ + if (configuration.joinAsClient) { + /** Set the configuration file to system properties, so that Hazelcast will read automatically */ + System.setProperty("hazelcast.client.config", clusterConfigFile.absolutePath) + joinedClient = true + val hazelcastClientConfiguration = YamlClientConfigBuilder().build() + hazelcastClientConfiguration.properties = configuration.properties + HazelcastClient.newHazelcastClient(hazelcastClientConfiguration) + } else { + /** Hazelcast Server from config file */ + val hazelcastServerConfiguration = FileSystemYamlConfig(normalizedFile(configFile)) + hazelcastServerConfiguration.properties = configuration.properties + hazelcastServerConfiguration.memberAttributeConfig = memberAttributeConfig + joinedLite = hazelcastServerConfiguration.isLiteMember + Hazelcast.newHazelcastInstance(hazelcastServerConfiguration) + } + } + else -> { + throw BluePrintProcessorException("couldn't understand the cluster configuration") + } + } + + /** Add the Membership Listeners */ + hazelcast.cluster.addMembershipListener(BlueprintsClusterMembershipListener(this)) + log.info( + "Cluster(${hazelcast.config.clusterName}) node(${hazelcast.name}) created successfully...." + ) + } + + override fun isClient(): Boolean { + return joinedClient + } + + override fun isLiteMember(): Boolean { + return joinedLite + } + + override fun clusterJoined(): Boolean { + return hazelcast.lifecycleService.isRunning + } + + override suspend fun masterMember(partitionGroup: String): ClusterMember { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.first().toClusterMember() + } + + override suspend fun allMembers(): Set<ClusterMember> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.cluster.members.map { it.toClusterMember() }.toSet() + } + + override suspend fun applicationMembers(appName: String): Set<ClusterMember> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcastApplicationMembers(appName).mapNotNull { it.value.toClusterMember() }.toSet() + } + + override suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getMap<String, T>(name) + } + + /** + * The DistributedLock is a distributed implementation of Java’s Lock. + * This API provides monotonically increasing, globally unique lock instance identifiers that can be used to + * determine ordering of multiple concurrent lock holders. + * DistributedLocks are designed to account for failures within the cluster. + * When a lock holder crashes or becomes disconnected from the partition by which the lock’s state is controlled, + * the lock will be released and granted to the next waiting process. + */ + override suspend fun clusterLock(name: String): ClusterLock { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return ClusterLockImpl(hazelcast, name) + } + + /** Return interface may change and it will be included in BluePrintClusterService */ + @UseExperimental + suspend fun clusterScheduler(name: String): IScheduledExecutorService { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + return hazelcast.getScheduledExecutorService(name) + } + + override suspend fun shutDown(duration: Duration) { + if (::hazelcast.isInitialized && clusterJoined()) { + delay(duration.toMillis()) + hazelcast.lifecycleService.terminate() + } + } + + /** Utils */ + suspend fun myHazelcastApplicationMembers(): Map<String, Member> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + check(!isClient()) { "not supported for cluster client members." } + return hazelcastApplicationMembers(ClusterUtils.applicationName()) + } + + suspend fun hazelcastApplicationMembers(appName: String): Map<String, Member> { + check(::hazelcast.isInitialized) { "failed to start and join cluster" } + val applicationMembers: MutableMap<String, Member> = hashMapOf() + hazelcast.cluster.members.map { member -> + val memberName: String = member.getAttribute(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID) + if (memberName.startsWith(appName, true)) { + applicationMembers[memberName] = member + } + } + return applicationMembers + } +} + +open class BlueprintsClusterMembershipListener(val hazlecastClusterService: HazlecastClusterService) : + MembershipListener { + private val log = logger(BlueprintsClusterMembershipListener::class) + + override fun memberRemoved(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Removed: $membershipEvent") + } + + override fun memberAdded(membershipEvent: MembershipEvent) { + log.info("${hazlecastClusterService.hazelcast.cluster.localMember} : Member Added : $membershipEvent") + } +} + +open class ClusterLockImpl(private val hazelcast: HazelcastInstance, private val name: String) : ClusterLock { + private val log = logger(ClusterLockImpl::class) + + lateinit var distributedLock: FencedLock + + override fun name(): String { + return distributedLock.name + } + + override suspend fun lock() { + distributedLock = hazelcast.cpSubsystem.getLock(name) + distributedLock.lock() + log.trace("Cluster lock($name) created..") + } + + override suspend fun tryLock(timeout: Long): Boolean { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLock(timeout, TimeUnit.MILLISECONDS) + } + + override suspend fun unLock() { + distributedLock.unlock() + log.trace("Cluster unlock(${name()}) successfully..") + } + + override fun isLocked(): Boolean { + return distributedLock.isLocked + } + + override suspend fun fenceLock(): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + val fence = distributedLock.lockAndGetFence() + log.trace("Cluster lock($name) fence($fence) created..") + return fence.toString() + } + + override suspend fun tryFenceLock(timeout: Long): String { + distributedLock = hazelcast.cpSubsystem.getLock(name) + return distributedLock.tryLockAndGetFence(timeout, TimeUnit.MILLISECONDS).toString() + } + + override fun close() { + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt index f994628a2..53f18d38a 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintClusterService.kt @@ -17,23 +17,33 @@ package org.onap.ccsdk.cds.blueprintsprocessor.core.service import java.time.Duration +import java.util.Properties interface BluePrintClusterService { /** Start the cluster with [clusterInfo], By default clustering service is disabled. * Application module has to start cluster */ - suspend fun startCluster(clusterInfo: ClusterInfo) + suspend fun <T> startCluster(configuration: T) fun clusterJoined(): Boolean + fun isClient(): Boolean + + fun isLiteMember(): Boolean + /** Returns [partitionGroup] master member */ suspend fun masterMember(partitionGroup: String): ClusterMember /** Returns all the data cluster members */ suspend fun allMembers(): Set<ClusterMember> - /** Returns data cluster members starting with prefix */ - suspend fun clusterMembersForPrefix(memberPrefix: String): Set<ClusterMember> + /** + * Returns application cluster members for [appName] joined as server or lite member, + * Node joined as client won't be visible. Here the assumption is node-id is combination of + * application id and replica number, for an example Application cds-cluster then the node ids will be + * cds-cluster-1, cds-cluster-2, cds-cluster-3 + */ + suspend fun applicationMembers(appName: String): Set<ClusterMember> /** Create and get or get the distributed data map store with [name] */ suspend fun <T> clusterMapStore(name: String): MutableMap<String, T> @@ -47,19 +57,25 @@ interface BluePrintClusterService { data class ClusterInfo( val id: String, - var configFile: String? = null, val nodeId: String, - val nodeAddress: String, - var clusterMembers: List<String>, - var storagePath: String + var joinAsClient: Boolean = false, + var properties: Properties?, + var configFile: String ) -data class ClusterMember(val id: String, val memberAddress: String?, val state: String? = null) +data class ClusterMember( + val id: String, + val name: String, + val memberAddress: String?, + val state: String? = null +) interface ClusterLock { fun name(): String suspend fun lock() + suspend fun fenceLock(): String suspend fun tryLock(timeout: Long): Boolean + suspend fun tryFenceLock(timeout: Long): String suspend fun unLock() fun isLocked(): Boolean fun close() diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt new file mode 100644 index 000000000..b298eacae --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/cluster/HazlecastClusterServiceTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.cluster + +import com.fasterxml.jackson.databind.JsonNode +import com.hazelcast.client.config.YamlClientConfigBuilder +import com.hazelcast.cluster.Member +import com.hazelcast.config.FileSystemYamlConfig +import com.hazelcast.map.IMap +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterInfo +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile +import java.io.Serializable +import java.time.Duration +import java.util.Properties +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue + +class HazlecastClusterServiceTest { + private val log = logger(HazlecastClusterServiceTest::class) + private val clusterSize = 3 + + @Test + fun testClientFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + System.setProperty( + "hazelcast.client.config", + normalizedFile("./src/test/resources/hazelcast/hazelcast-client.yaml").absolutePath + ) + val config = YamlClientConfigBuilder().build() + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testServerFileSystemYamlConfig() { + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_ID, "test-cluster") + System.setProperty(BluePrintConstants.PROPERTY_CLUSTER_NODE_ID, "node-1234") + val configFile = normalizedFile("./src/test/resources/hazelcast/hazelcast.yaml") + val config = FileSystemYamlConfig(configFile) + assertNotNull(config) + assertEquals("test-cluster", config.clusterName) + assertEquals("node-1234", config.instanceName) + } + + @Test + fun testClusterJoin() { + runBlocking { + val bluePrintClusterServiceOne = + createCluster(arrayListOf(5679, 5680, 5681)).toMutableList() + // delay(1000) + // Join as Hazlecast Management Node + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), true) + // val bluePrintClusterServiceTwo = createCluster(arrayListOf(5682), false) + // bluePrintClusterServiceOne.addAll(bluePrintClusterServiceTwo) + printReachableMembers(bluePrintClusterServiceOne) + testDistributedStore(bluePrintClusterServiceOne) + testDistributedLock(bluePrintClusterServiceOne) + + // executeScheduler(bluePrintClusterServiceOne[0]) + // delay(1000) + // Shutdown + shutdown(bluePrintClusterServiceOne) + } + } + + private suspend fun createCluster( + ports: List<Int>, + joinAsClient: Boolean? = false + ): List<BluePrintClusterService> { + + return withContext(Dispatchers.Default) { + val deferred = ports.map { port -> + async(Dispatchers.IO) { + val nodeId = "node-$port" + log.info("********** Starting node($nodeId) on port($port)") + val properties = Properties() + properties["hazelcast.logging.type"] = "slf4j" + val clusterInfo = + if (joinAsClient!!) { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = true, + configFile = "./src/test/resources/hazelcast/hazelcast-client.yaml", + properties = properties + ) + } else { + ClusterInfo( + id = "test-cluster", nodeId = nodeId, joinAsClient = false, + configFile = "./src/test/resources/hazelcast/hazelcast-$port.yaml", + properties = properties + ) + } + val hazlecastClusterService = HazlecastClusterService() + hazlecastClusterService.startCluster(clusterInfo) + hazlecastClusterService + } + } + deferred.awaitAll() + } + } + + private suspend fun shutdown(bluePrintClusterServices: List<BluePrintClusterService>) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + bluePrintClusterService.shutDown(Duration.ofMillis(10)) + } + } + + private suspend fun testDistributedStore(bluePrintClusterServices: List<BluePrintClusterService>) { + /** Test Distributed store creation */ + repeat(2) { storeId -> + val store = bluePrintClusterServices[0].clusterMapStore<JsonNode>( + "blueprint-runtime-$storeId" + ) as IMap + assertNotNull(store, "failed to get store") + repeat(5) { + store["key-$storeId-$it"] = "value-$it".asJsonPrimitive() + } + + val store1 = bluePrintClusterServices[1].clusterMapStore<JsonNode>( + "blueprint-runtime-$storeId" + ) as IMap + + store1.values.map { + log.trace("Received map event : $it") + } + delay(5) + store.clear() + } + } + + private suspend fun testDistributedLock(bluePrintClusterServices: List<BluePrintClusterService>) { + val lockName = "sample-lock" + withContext(Dispatchers.IO) { + val deferred = async { + executeLock(bluePrintClusterServices[0], "first", lockName) + } + val deferred2 = async { + executeLock(bluePrintClusterServices[0], "second", lockName) + } + val deferred3 = async { + executeLock(bluePrintClusterServices[2], "third", lockName) + } + deferred.start() + deferred2.start() + deferred3.start() + } + } + + private suspend fun executeLock( + bluePrintClusterService: BluePrintClusterService, + lockId: String, + lockName: String + ) { + log.info("initialising $lockId lock...") + val distributedLock = bluePrintClusterService.clusterLock(lockName) + assertNotNull(distributedLock, "failed to create distributed $lockId lock") + distributedLock.lock() + assertTrue(distributedLock.isLocked(), "failed to lock $lockId") + try { + log.info("locked $lockId process for 5mSec") + delay(5) + } finally { + distributedLock.unLock() + log.info("$lockId lock released") + } + distributedLock.close() + } + + private suspend fun executeScheduler(bluePrintClusterService: BluePrintClusterService) { + log.info("initialising ...") + val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + + val memberNameMap = bluePrintClusterService.clusterMapStore<Member>("member-name-map") as IMap + assertEquals(3, memberNameMap.size, "failed to match member size") + memberNameMap.forEach { (key, value) -> log.info("nodeId($key), Member($value)") } + val scheduler = hazlecastClusterService.clusterScheduler("cleanup") + // scheduler.scheduleOnAllMembers(SampleSchedulerTask(), 0, TimeUnit.SECONDS) + // scheduler.scheduleOnKeyOwnerAtFixedRate(SampleSchedulerTask(), "node-5680",0, 1, TimeUnit.SECONDS) + // scheduler.scheduleAtFixedRate(SampleSchedulerTask(), 0, 1, TimeUnit.SECONDS) + // scheduler.scheduleOnAllMembersAtFixedRate(SampleSchedulerTask(), 0, 5, TimeUnit.SECONDS) + } + + private suspend fun printReachableMembers(bluePrintClusterServices: List<BluePrintClusterService>) { + bluePrintClusterServices.forEach { bluePrintClusterService -> + val hazlecastClusterService = bluePrintClusterService as HazlecastClusterService + val hazelcast = hazlecastClusterService.hazelcast + val self = if (!bluePrintClusterService.isClient()) hazelcast.cluster.localMember else null + val master = hazlecastClusterService.masterMember("system").memberAddress + val members = hazlecastClusterService.allMembers().map { it.memberAddress } + log.info("Cluster Members for($self): master($master) Members($members)") + } + + val applicationMembers = bluePrintClusterServices[0].applicationMembers("node-56") + assertEquals(clusterSize, applicationMembers.size, "failed to match applications member size") + log.info("Cluster applicationMembers ($applicationMembers)") + } +} + +open class SampleSchedulerTask : Runnable, Serializable { + private val log = logger(SampleSchedulerTask::class) + override fun run() { + log.info("I am scheduler action") + } +} diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml new file mode 100644 index 000000000..cbf488c95 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5679.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5679 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml new file mode 100644 index 000000000..356be1d05 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5680.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5680 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml new file mode 100644 index 000000000..d256f49e3 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5681.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5681 + lite-member: + enabled: false + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml new file mode 100644 index 000000000..9c7d566db --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-5682.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: node-5682 + lite-member: + enabled: true + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml new file mode 100644 index 000000000..e60b5dfc4 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast-client.yaml @@ -0,0 +1,13 @@ +hazelcast-client: + cluster-name: ${CLUSTER_ID} + instance-name: ${CLUSTER_NODE_ID} + + network: + cluster-members: + - 127.0.0.1:5701 +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml new file mode 100644 index 000000000..dcecf454f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/hazelcast/hazelcast.yaml @@ -0,0 +1,18 @@ +hazelcast: + cluster-name: ${CLUSTER_ID} + instance-name: ${CLUSTER_NODE_ID} + lite-member: + enabled: true + cp-subsystem: + cp-member-count: 3 + group-size: 3 +# network: +# join: +# multicast: +# enabled: false +# kubernetes: +# enabled: true +# namespace: MY-KUBERNETES-NAMESPACE +# service-name: MY-SERVICE-NAME +# service-label-name: MY-SERVICE-LABEL-NAME +# service-label-value: MY-SERVICE-LABEL-VALUE
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml index 016d48636..5275f4029 100644 --- a/ms/blueprintsprocessor/modules/commons/atomix-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/test/resources/logback-test.xml @@ -1,5 +1,5 @@ <!-- - ~ Copyright © 2017-2018 AT&T Intellectual Property. + ~ Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ <logger name="org.springframework.test" level="warn"/> <logger name="org.springframework" level="warn"/> <logger name="org.hibernate" level="info"/> - <logger name="io.atomix" level="warn"/> + <logger name="com.hazelcast" level="warn"/> <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/> <root level="warn"> |